You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/06/02 10:52:58 UTC
[1/4] cassandra git commit: Run CommitLog tests with different
compression settings
Repository: cassandra
Updated Branches:
refs/heads/trunk eb5a59a31 -> adfbf518e
Run CommitLog tests with different compression settings
patch by Benjamin Lerer; reviewed by Branimir Lambov for CASSANDRA-9039
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6c445d6b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6c445d6b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6c445d6b
Branch: refs/heads/trunk
Commit: 6c445d6b7f3c8933a0bfd599ba8455b7254a323d
Parents: b8f5c1f
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:31:31 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:31:31 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 71 ++++++++++-
.../db/commitlog/CommitLogSegment.java | 5 +-
.../db/commitlog/CommitLogSegmentManager.java | 15 ++-
.../db/commitlog/CompressedSegment.java | 6 +-
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../cassandra/db/RecoveryManager2Test.java | 36 ++++++
.../cassandra/db/RecoveryManager3Test.java | 33 +++++
.../cassandra/db/RecoveryManagerTest.java | 42 +++++--
.../db/RecoveryManagerTruncateTest.java | 35 ++++++
.../db/commitlog/CommitLogDescriptorTest.java | 103 ++++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 121 ++++++-------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 2 +-
13 files changed, 358 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c97293d..9752d16 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.7
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
* Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
* Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 9a6ba34..460ecfe 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -70,8 +70,7 @@ public class CommitLog implements CommitLogMBean
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
- final ICompressor compressor;
- public ParameterizedClass compressorClass;
+ volatile Configuration configuration;
final public String location;
static private CommitLog construct()
@@ -93,12 +92,10 @@ public class CommitLog implements CommitLogMBean
@VisibleForTesting
CommitLog(String location, CommitLogArchiver archiver)
{
- compressorClass = DatabaseDescriptor.getCommitLogCompression();
this.location = location;
- ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+ this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
DatabaseDescriptor.createAllDirectories();
- this.compressor = compressor;
this.archiver = archiver;
metrics = new CommitLogMetrics();
@@ -412,6 +409,7 @@ public class CommitLog implements CommitLogMBean
public int resetUnsafe(boolean deleteSegments) throws IOException
{
stopUnsafe(deleteSegments);
+ resetConfiguration();
return restartUnsafe();
}
@@ -434,6 +432,14 @@ public class CommitLog implements CommitLogMBean
}
/**
+ * FOR TESTING PURPOSES.
+ */
+ public void resetConfiguration()
+ {
+ this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+ }
+
+ /**
* FOR TESTING PURPOSES. See CommitLogAllocator
*/
public int restartUnsafe() throws IOException
@@ -488,4 +494,59 @@ public class CommitLog implements CommitLogMBean
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
+
+ public static final class Configuration
+ {
+ /**
+ * The compressor class.
+ */
+ private final ParameterizedClass compressorClass;
+
+ /**
+ * The compressor used to compress the segments.
+ */
+ private final ICompressor compressor;
+
+ public Configuration(ParameterizedClass compressorClass)
+ {
+ this.compressorClass = compressorClass;
+ this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
+ }
+
+ /**
+ * Checks if the segments must be compressed.
+ * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+ */
+ public boolean useCompression()
+ {
+ return compressor != null;
+ }
+
+ /**
+ * Returns the compressor used to compress the segments.
+ * @return the compressor used to compress the segments
+ */
+ public ICompressor getCompressor()
+ {
+ return compressor;
+ }
+
+ /**
+ * Returns the compressor class.
+ * @return the compressor class
+ */
+ public ParameterizedClass getCompressorClass()
+ {
+ return compressorClass;
+ }
+
+ /**
+ * Returns the compressor name.
+ * @return the compressor name.
+ */
+ public String getCompressorName()
+ {
+ return useCompression() ? compressor.getClass().getSimpleName() : "none";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index b6801d2..ba28f3e 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -119,7 +119,8 @@ public abstract class CommitLogSegment
static CommitLogSegment createSegment(CommitLog commitLog)
{
- return commitLog.compressor != null ? new CompressedSegment(commitLog) : new MemoryMappedSegment(commitLog);
+ return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
+ : new MemoryMappedSegment(commitLog);
}
static long getNextId()
@@ -136,7 +137,7 @@ public abstract class CommitLogSegment
{
this.commitLog = commitLog;
id = getNextId();
- descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
+ descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
logFile = new File(commitLog.location, descriptor.fileName());
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 636c73b..8670fd7 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -491,13 +491,16 @@ public class CommitLogSegmentManager
throw new RuntimeException(e);
}
- for (CommitLogSegment segment : activeSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- activeSegments.clear();
+ synchronized (this)
+ {
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
- for (CommitLogSegment segment : availableSegments)
- closeAndDeleteSegmentUnsafe(segment, deleteSegments);
- availableSegments.clear();
+ for (CommitLogSegment segment : availableSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ availableSegments.clear();
+ }
allocatingFrom = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 8c62536..219709b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -61,7 +61,7 @@ public class CompressedSegment extends CommitLogSegment
CompressedSegment(CommitLog commitLog)
{
super(commitLog);
- this.compressor = commitLog.compressor;
+ this.compressor = commitLog.configuration.getCompressor();
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
@@ -84,7 +84,9 @@ public class CompressedSegment extends CommitLogSegment
if (buf == null)
{
// this.compressor is not yet set, so we must use the commitLog's one.
- buf = commitLog.compressor.preferredBufferType().allocate(DatabaseDescriptor.getCommitLogSegmentSize());
+ buf = commitLog.configuration.getCompressor()
+ .preferredBufferType()
+ .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
} else
buf.clear();
return buf;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index f9b4156..4604c49 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -218,7 +218,7 @@ public class CommitLogStressTest
{
System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+ commitLog.configuration.getCompressorName(),
commitLog.executor.getClass().getSimpleName(),
randomSize ? " random size" : "",
discardedRun ? " with discarded run" : "");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
index 13c3452..3beb28e 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
@@ -21,22 +21,37 @@ package org.apache.cassandra.db;
*/
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cassandra.Util.column;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+@RunWith(Parameterized.class)
public class RecoveryManager2Test
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManager2Test.class);
@@ -56,6 +71,27 @@ public class RecoveryManager2Test
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
}
+ public RecoveryManager2Test(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
+ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+ }
+
@Test
/* test that commit logs do not replay flushed data */
public void testWithFlush() throws Exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
index a94d94d..2dd7eae 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
@@ -23,22 +23,34 @@ package org.apache.cassandra.db;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.locator.SimpleStrategy;
import static org.apache.cassandra.Util.column;
import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
+@RunWith(Parameterized.class)
public class RecoveryManager3Test
{
private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@ -47,6 +59,27 @@ public class RecoveryManager3Test
private static final String KEYSPACE2 = "RecoveryManager3Test2";
private static final String CF_STANDARD3 = "Standard3";
+ public RecoveryManager3Test(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
+ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index c9abe0d..5676b99 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -19,31 +19,38 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
-import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.locator.SimpleStrategy;
+import static org.apache.cassandra.Util.cellname;
import static org.apache.cassandra.Util.column;
import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
-import static org.apache.cassandra.Util.cellname;
-@RunWith(OrderedJUnit4ClassRunner.class)
+@RunWith(Parameterized.class)
public class RecoveryManagerTest
{
private static final String KEYSPACE1 = "RecoveryManagerTest1";
@@ -68,6 +75,21 @@ public class RecoveryManagerTest
SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
}
+ public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
+ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+ }
+
@Test
public void testNothingToRecover() throws IOException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index a004105..769316f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@ -22,26 +22,61 @@ import static org.apache.cassandra.Util.column;
import static org.junit.Assert.*;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
* Test for the truncate operation.
*/
+@RunWith(Parameterized.class)
public class RecoveryManagerTruncateTest
{
private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
+ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
+ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
new file mode 100644
index 0000000..8d63959
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.net.MessagingService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class CommitLogDescriptorTest
+{
+ @Test
+ public void testVersions()
+ {
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+ assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+ assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+ String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+ assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+ }
+
+ private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+ {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ long length = buf.position();
+ // Put some extra data in the stream.
+ buf.putDouble(0.1);
+ buf.flip();
+ try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
+ {
+ CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
+ assertEquals("Descriptor length", length, input.getFilePointer());
+ assertEquals("Descriptors", desc, read);
+ }
+ }
+
+ @Test
+ public void testDescriptorPersistence() throws IOException
+ {
+ testDescriptorPersistence(new CommitLogDescriptor(11, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
+ new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+ }
+
+ @Test
+ public void testDescriptorInvalidParametersSize() throws IOException
+ {
+ Map<String, String> params = new HashMap<>();
+ for (int i=0; i<6000; ++i)
+ params.put("key"+i, Integer.toString(i, 16));
+ try {
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
+ 21,
+ new ParameterizedClass("LZ4Compressor", params));
+ ByteBuffer buf = ByteBuffer.allocate(1024000);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ fail("Parameter object too long should fail on writing descriptor.");
+ } catch (ConfigurationException e)
+ {
+ // correct path
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 0ad880b..9999b42 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@ -19,53 +19,46 @@
package org.apache.cassandra.db.commitlog;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
+import java.io.*;
import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import com.google.common.collect.ImmutableMap;
-
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.compress.DeflateCompressor;
+import org.apache.cassandra.io.compress.LZ4Compressor;
+import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.*;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+@RunWith(Parameterized.class)
public class CommitLogTest
{
private static final String KEYSPACE1 = "CommitLogTest";
@@ -73,6 +66,27 @@ public class CommitLogTest
private static final String CF1 = "Standard1";
private static final String CF2 = "Standard2";
+ public CommitLogTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
+ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
+ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@ -194,7 +208,6 @@ public class CommitLogTest
@Test
public void testDontDeleteIfDirty() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
// Roughly 32 MB mutation
Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
@@ -224,7 +237,6 @@ public class CommitLogTest
public void testDeleteIfNotDirty() throws Exception
{
DatabaseDescriptor.getCommitLogSegmentSize();
- CommitLog.instance.resetUnsafe(true);
// Roughly 32 MB mutation
Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
@@ -282,8 +294,6 @@ public class CommitLogTest
@Test
public void testEqualRecordLimit() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
-
Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
CommitLog.instance.add(rm);
@@ -292,7 +302,6 @@ public class CommitLogTest
@Test
public void testExceedRecordLimit() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
try
{
Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
@@ -423,25 +432,8 @@ public class CommitLogTest
}
@Test
- public void testVersions()
- {
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
- Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
- Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
- String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
- Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
- }
-
- @Test
public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
{
- CommitLog.instance.resetUnsafe(true);
boolean prev = DatabaseDescriptor.isAutoSnapshot();
DatabaseDescriptor.setAutoSnapshot(false);
ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
@@ -470,7 +462,6 @@ public class CommitLogTest
@Test
public void testTruncateWithoutSnapshotNonDurable() throws IOException
{
- CommitLog.instance.resetUnsafe(true);
boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
DatabaseDescriptor.setAutoSnapshot(false);
Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
@@ -494,48 +485,4 @@ public class CommitLogTest
row = command.getRow(notDurableKs);
Assert.assertEquals(null, row.cf);
}
-
- private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
- {
- ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc);
- long length = buf.position();
- // Put some extra data in the stream.
- buf.putDouble(0.1);
- buf.flip();
- FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0);
- CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- Assert.assertEquals("Descriptor length", length, input.getFilePointer());
- Assert.assertEquals("Descriptors", desc, read);
- }
-
- @Test
- public void testDescriptorPersistence() throws IOException
- {
- testDescriptorPersistence(new CommitLogDescriptor(11, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
- new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
- }
-
- @Test
- public void testDescriptorInvalidParametersSize() throws IOException
- {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<6000; ++i)
- params.put("key"+i, Integer.toString(i, 16));
- try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
- 21,
- new ParameterizedClass("LZ4Compressor", params));
- ByteBuffer buf = ByteBuffer.allocate(1024000);
- CommitLogDescriptor.writeHeader(buf, desc);
- Assert.fail("Parameter object too long should fail on writing descriptor.");
- } catch (ConfigurationException e)
- {
- // correct path
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6c445d6b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 7b07c8e..175a8d6 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@ -98,7 +98,7 @@ public class CommitLogUpgradeTestMaker
CommitLog commitLog = CommitLog.instance;
System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
+ commitLog.configuration.getCompressorName(),
commitLog.executor.getClass().getSimpleName(),
randomSize ? " random size" : "");
final List<CommitlogExecutor> threads = new ArrayList<>();
[3/4] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.7
Posted by bl...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.7
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc6ffc25
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc6ffc25
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc6ffc25
Branch: refs/heads/trunk
Commit: dc6ffc25a8d00659385a1219d0189bd068ef110d
Parents: dbf0310 1e82695
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:47:03 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:50:19 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 102 +++++++++-
.../db/commitlog/CommitLogSegment.java | 15 +-
.../db/commitlog/CommitLogSegmentManager.java | 17 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/CommitLogStressTest.java | 12 +-
.../db/RecoveryManagerFlushedTest.java | 40 ++++
.../db/RecoveryManagerMissingHeaderTest.java | 38 +++-
.../cassandra/db/RecoveryManagerTest.java | 167 ++++++++++-------
.../db/RecoveryManagerTruncateTest.java | 38 ++++
.../db/commitlog/CommitLogDescriptorTest.java | 3 +-
.../cassandra/db/commitlog/CommitLogTest.java | 187 ++++++-------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 4 +-
14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a54f4fd,70da4ad..2a66eb4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,11 -20,10 +22,12 @@@ Merged from 2.2
* Enable client encryption in sstableloader with cli options (CASSANDRA-11708)
* Possible memory leak in NIODataInputStream (CASSANDRA-11867)
* Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
+ * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
* Prohibit Reversed Counter type as part of the PK (CASSANDRA-9395)
+ * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
Merged from 2.1:
++ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: apply current keyspace to source command (CASSANDRA-11152)
- * Backport CASSANDRA-11578 (CASSANDRA-11750)
* Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
* Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
* Do not consider local node a valid source during replace (CASSANDRA-11848)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 10bc91a,dcdd855..4a660ca
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -96,13 -92,10 +94,11 @@@ public class CommitLog implements Commi
@VisibleForTesting
CommitLog(String location, CommitLogArchiver archiver)
{
- compressorClass = DatabaseDescriptor.getCommitLogCompression();
this.location = location;
- ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
- this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++ this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++ DatabaseDescriptor.getEncryptionContext());
DatabaseDescriptor.createAllDirectories();
- encryptionContext = DatabaseDescriptor.getEncryptionContext();
- this.compressor = compressor;
this.archiver = archiver;
metrics = new CommitLogMetrics();
@@@ -146,7 -139,7 +142,8 @@@
};
// submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904
-- for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter))
++ File[] listFiles = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter);
++ for (File file : listFiles)
{
archiver.maybeArchive(file.getPath(), file.getName());
archiver.maybeWaitForArchiving(file.getName());
@@@ -420,6 -413,6 +418,15 @@@
}
/**
++ * FOR TESTING PURPOSES.
++ */
++ public void resetConfiguration()
++ {
++ configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression(),
++ DatabaseDescriptor.getEncryptionContext());
++ }
++
++ /**
* FOR TESTING PURPOSES. See CommitLogAllocator.
*/
public void stopUnsafe(boolean deleteSegments)
@@@ -492,4 -493,59 +499,83 @@@
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
+
+ public static final class Configuration
+ {
+ /**
+ * The compressor class.
+ */
+ private final ParameterizedClass compressorClass;
+
+ /**
+ * The compressor used to compress the segments.
+ */
+ private final ICompressor compressor;
+
- public Configuration(ParameterizedClass compressorClass)
++ /**
++ * The encryption context used to encrypt the segments.
++ */
++ private EncryptionContext encryptionContext;
++
++ public Configuration(ParameterizedClass compressorClass, EncryptionContext encryptionContext)
+ {
+ this.compressorClass = compressorClass;
+ this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
++ this.encryptionContext = encryptionContext;
+ }
+
+ /**
+ * Checks if the segments must be compressed.
+ * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+ */
+ public boolean useCompression()
+ {
+ return compressor != null;
+ }
+
+ /**
++ * Checks if the segments must be encrypted.
++ * @return <code>true</code> if the segments must be encrypted, <code>false</code> otherwise.
++ */
++ public boolean useEncryption()
++ {
++ return encryptionContext.isEnabled();
++ }
++
++ /**
+ * Returns the compressor used to compress the segments.
+ * @return the compressor used to compress the segments
+ */
+ public ICompressor getCompressor()
+ {
+ return compressor;
+ }
+
+ /**
+ * Returns the compressor class.
+ * @return the compressor class
+ */
+ public ParameterizedClass getCompressorClass()
+ {
+ return compressorClass;
+ }
+
+ /**
+ * Returns the compressor name.
+ * @return the compressor name.
+ */
+ public String getCompressorName()
+ {
+ return useCompression() ? compressor.getClass().getSimpleName() : "none";
+ }
++
++ /**
++ * Returns the encryption context used to encrypt the segments.
++ * @return the encryption context used to encrypt the segments
++ */
++ public EncryptionContext getEncryptionContext()
++ {
++ return encryptionContext;
++ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 8f8b523,27c05b4..2045c35
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -46,6 -45,6 +46,7 @@@ import org.apache.cassandra.config.CFMe
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLog.Configuration;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
@@@ -122,11 -120,8 +123,12 @@@ public abstract class CommitLogSegmen
static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- CommitLogSegment segment = commitLog.encryptionContext.isEnabled() ? new EncryptedSegment(commitLog, commitLog.encryptionContext, onClose) :
- commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) :
- new MemoryMappedSegment(commitLog);
- return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
- : new MemoryMappedSegment(commitLog);
++ Configuration config = commitLog.configuration;
++ CommitLogSegment segment = config.useEncryption() ? new EncryptedSegment(commitLog, onClose)
++ : config.useCompression() ? new CompressedSegment(commitLog, onClose)
++ : new MemoryMappedSegment(commitLog);
+ segment.writeLogHeader();
+ return segment;
}
/**
@@@ -137,7 -132,7 +139,8 @@@
*/
static boolean usesBufferPool(CommitLog commitLog)
{
- return commitLog.encryptionContext.isEnabled() || commitLog.compressor != null;
- return commitLog.configuration.useCompression();
++ Configuration config = commitLog.configuration;
++ return config.useEncryption() || config.useCompression();
}
static long getNextId()
@@@ -152,7 -149,7 +155,9 @@@
{
this.commitLog = commitLog;
id = getNextId();
- descriptor = new CommitLogDescriptor(id, commitLog.compressorClass, commitLog.encryptionContext);
- descriptor = new CommitLogDescriptor(id, commitLog.configuration.getCompressorClass());
++ descriptor = new CommitLogDescriptor(id,
++ commitLog.configuration.getCompressorClass(),
++ commitLog.configuration.getEncryptionContext());
logFile = new File(commitLog.location, descriptor.fileName());
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 573428a,c73a30a..684fc2c
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -46,8 -68,18 +46,8 @@@ public class CompressedSegment extends
*/
CompressedSegment(CommitLog commitLog, Runnable onClose)
{
- super(commitLog);
+ super(commitLog, onClose);
- this.compressor = commitLog.compressor;
+ this.compressor = commitLog.configuration.getCompressor();
- this.onClose = onClose;
- try
- {
- channel.write((ByteBuffer) buffer.duplicate().flip());
- commitLog.allocator.addSize(lastWrittenPos = buffer.position());
- }
- catch (IOException e)
- {
- throw new FSWriteError(e, getPath());
- }
}
ByteBuffer allocate(int size)
@@@ -57,9 -89,21 +57,9 @@@
ByteBuffer createBuffer(CommitLog commitLog)
{
- return createBuffer(commitLog.compressor.preferredBufferType());
- usedBuffers.incrementAndGet();
- ByteBuffer buf = bufferPool.poll();
- if (buf == null)
- {
- // this.compressor is not yet set, so we must use the commitLog's one.
- buf = commitLog.configuration.getCompressor()
- .preferredBufferType()
- .allocate(DatabaseDescriptor.getCommitLogSegmentSize());
- } else
- buf.clear();
- return buf;
++ return createBuffer(commitLog.configuration.getCompressor().preferredBufferType());
}
- static long startMillis = System.currentTimeMillis();
-
@Override
void write(int startMarker, int nextMarker)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
index 731dea4,0000000..c34a365
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/EncryptedSegment.java
@@@ -1,161 -1,0 +1,161 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.crypto.Cipher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.compress.ICompressor;
+import org.apache.cassandra.security.EncryptionUtils;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.SyncUtil;
+
+import static org.apache.cassandra.security.EncryptionUtils.ENCRYPTED_BLOCK_HEADER_SIZE;
+
+/**
+ * Writes encrypted segments to disk. Data is compressed before encrypting to (hopefully) reduce the size of the data into
+ * the encryption algorithms.
+ *
+ * The format of the encrypted commit log is as follows:
+ * - standard commit log header (as written by {@link CommitLogDescriptor#writeHeader(ByteBuffer, CommitLogDescriptor)})
+ * - a series of 'sync segments' that are written every time the commit log is sync()'ed
+ * -- a sync section header, see {@link CommitLogSegment#writeSyncMarker(ByteBuffer, int, int, int)}
+ * -- total plain text length for this section
+ * -- a series of encrypted data blocks, each of which contains:
+ * --- the length of the encrypted block (cipher text)
+ * --- the length of the unencrypted data (compressed text)
+ * --- the encrypted block, which contains:
+ * ---- the length of the plain text (raw) data
+ * ---- block of compressed data
+ *
+ * Notes:
+ * - "length of the unencrypted data" is different from the length of resulting decrypted buffer as encryption adds padding
+ * to the output buffer, and we need to ignore that padding when processing.
+ */
+public class EncryptedSegment extends FileDirectSegment
+{
+ private static final Logger logger = LoggerFactory.getLogger(EncryptedSegment.class);
+
+ private static final int ENCRYPTED_SECTION_HEADER_SIZE = SYNC_MARKER_SIZE + 4;
+
+ private final EncryptionContext encryptionContext;
+ private final Cipher cipher;
+
- public EncryptedSegment(CommitLog commitLog, EncryptionContext encryptionContext, Runnable onClose)
++ public EncryptedSegment(CommitLog commitLog, Runnable onClose)
+ {
+ super(commitLog, onClose);
- this.encryptionContext = encryptionContext;
++ this.encryptionContext = commitLog.configuration.getEncryptionContext();
+
+ try
+ {
+ cipher = encryptionContext.getEncryptor();
+ }
+ catch (IOException e)
+ {
+ throw new FSWriteError(e, logFile);
+ }
+ logger.debug("created a new encrypted commit log segment: {}", logFile);
+ }
+
+ protected Map<String, String> additionalHeaderParameters()
+ {
+ Map<String, String> map = encryptionContext.toHeaderParameters();
+ map.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(cipher.getIV()));
+ return map;
+ }
+
+ ByteBuffer createBuffer(CommitLog commitLog)
+ {
+ //Note: we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ return createBuffer(BufferType.ON_HEAP);
+ }
+
+ void write(int startMarker, int nextMarker)
+ {
+ int contentStart = startMarker + SYNC_MARKER_SIZE;
+ final int length = nextMarker - contentStart;
+ // The length may be 0 when the segment is being closed.
+ assert length > 0 || length == 0 && !isStillAllocating();
+
+ final ICompressor compressor = encryptionContext.getCompressor();
+ final int blockSize = encryptionContext.getChunkLength();
+ try
+ {
+ ByteBuffer inputBuffer = buffer.duplicate();
+ inputBuffer.limit(contentStart + length).position(contentStart);
+ ByteBuffer buffer = reusableBufferHolder.get();
+
+ // save space for the sync marker at the beginning of this section
+ final long syncMarkerPosition = lastWrittenPos;
+ channel.position(syncMarkerPosition + ENCRYPTED_SECTION_HEADER_SIZE);
+
+ // loop over the segment data in encryption buffer sized chunks
+ while (contentStart < nextMarker)
+ {
+ int nextBlockSize = nextMarker - blockSize > contentStart ? blockSize : nextMarker - contentStart;
+ ByteBuffer slice = inputBuffer.duplicate();
+ slice.limit(contentStart + nextBlockSize).position(contentStart);
+
+ buffer = EncryptionUtils.compress(slice, buffer, true, compressor);
+
+ // reuse the same buffer for the input and output of the encryption operation
+ buffer = EncryptionUtils.encryptAndWrite(buffer, channel, true, cipher);
+
+ contentStart += nextBlockSize;
+ commitLog.allocator.addSize(buffer.limit() + ENCRYPTED_BLOCK_HEADER_SIZE);
+ }
+
+ lastWrittenPos = channel.position();
+
+ // rewind to the beginning of the section and write out the sync marker,
+ // reusing the one of the existing buffers
+ buffer = ByteBufferUtil.ensureCapacity(buffer, ENCRYPTED_SECTION_HEADER_SIZE, true);
+ writeSyncMarker(buffer, 0, (int) syncMarkerPosition, (int) lastWrittenPos);
+ buffer.putInt(SYNC_MARKER_SIZE, length);
+ buffer.position(0).limit(ENCRYPTED_SECTION_HEADER_SIZE);
+ commitLog.allocator.addSize(buffer.limit());
+
+ channel.position(syncMarkerPosition);
+ channel.write(buffer);
+
+ SyncUtil.force(channel, true);
+
+ if (reusableBufferHolder.get().capacity() < buffer.capacity())
+ reusableBufferHolder.set(buffer);
+ }
+ catch (Exception e)
+ {
+ throw new FSWriteError(e, getPath());
+ }
+ }
+
+ public long onDiskSize()
+ {
+ return lastWrittenPos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 8e45eea,d517055..0474b32
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@@ -200,43 -198,34 +200,43 @@@ public class CommitLogStressTes
DatabaseDescriptor.setCommitLogSyncBatchWindow(1);
DatabaseDescriptor.setCommitLogSyncPeriod(30);
DatabaseDescriptor.setCommitLogSegmentSize(32);
- for (ParameterizedClass compressor : new ParameterizedClass[] {
- null,
- new ParameterizedClass("LZ4Compressor", null),
- new ParameterizedClass("SnappyCompressor", null),
- new ParameterizedClass("DeflateCompressor", null) })
+
+ // test plain vanilla commit logs (the choice of 98% of users)
+ testLog(null, EncryptionContextGenerator.createDisabledContext());
+
+ // test the compression types
+ testLog(new ParameterizedClass("LZ4Compressor", null), EncryptionContextGenerator.createDisabledContext());
+ testLog(new ParameterizedClass("SnappyCompressor", null), EncryptionContextGenerator.createDisabledContext());
+ testLog(new ParameterizedClass("DeflateCompressor", null), EncryptionContextGenerator.createDisabledContext());
+
+ // test the encrypted commit log
+ testLog(null, EncryptionContextGenerator.createContext(true));
+ }
+
+ public void testLog(ParameterizedClass compression, EncryptionContext encryptionContext) throws IOException, InterruptedException
+ {
+ DatabaseDescriptor.setCommitLogCompression(compression);
+ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ for (CommitLogSync sync : CommitLogSync.values())
{
- DatabaseDescriptor.setCommitLogCompression(compressor);
- for (CommitLogSync sync : CommitLogSync.values())
- {
- DatabaseDescriptor.setCommitLogSync(sync);
- CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
- testLog(commitLog);
- }
+ DatabaseDescriptor.setCommitLogSync(sync);
+ CommitLog commitLog = new CommitLog(location, CommitLogArchiver.disabled()).start();
+ testLog(commitLog);
+ assert !failed;
}
- assert !failed;
}
- public void testLog(CommitLog commitLog) throws IOException, InterruptedException
- {
- System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
- mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.configuration.getCompressorName(),
- commitLog.executor.getClass().getSimpleName(),
- randomSize ? " random size" : "",
- discardedRun ? " with discarded run" : "");
+ public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
+ System.out.format("\nTesting commit log size %.0fmb, compressor: %s, encryption enabled: %b, sync %s%s%s\n",
+ mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ commitLog.executor.getClass().getSimpleName(),
+ randomSize ? " random size" : "",
+ discardedRun ? " with discarded run" : "");
commitLog.allocator.enableReserveSegmentCreation();
-
- final List<CommitlogExecutor> threads = new ArrayList<>();
+
+ final List<CommitlogThread> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
discardedPos = ReplayPosition.NONE;
@@@ -294,17 -282,14 +294,17 @@@
Assert.fail("Failed to delete " + f);
if (hash == repl.hash && cells == repl.cells)
- System.out.println("Test success.");
+ System.out.format("Test success. compressor = %s, encryption enabled = %b; discarded = %d, skipped = %d\n",
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ repl.discarded, repl.skipped);
else
{
- System.out.format("Test failed. Cells %d expected %d, hash %d expected %d.\n",
- repl.cells,
- cells,
- repl.hash,
- hash);
+ System.out.format("Test failed (compressor = %s, encryption enabled = %b). Cells %d, expected %d, diff %d; discarded = %d, skipped = %d - hash %d expected %d.\n",
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled(),
++ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
+ repl.cells, cells, cells - repl.cells, repl.discarded, repl.skipped,
+ repl.hash, hash);
failed = true;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,d06c112..86fa5b4
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -25,13 -34,19 +34,21 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaKeyspace;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
import org.apache.cassandra.utils.FBUtilities;
+ @RunWith(Parameterized.class)
public class RecoveryManagerFlushedTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
@@@ -40,14 -55,35 +57,37 @@@
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_STANDARD2 = "Standard2";
- @BeforeClass
- public static void defineSchema() throws ConfigurationException
++ public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
- SchemaLoader.prepareServer();
- SchemaLoader.createKeyspace(KEYSPACE1,
- KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
{
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,8ac7c5d..a67e9e5
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -28,13 -35,17 +35,19 @@@ import org.junit.runners.Parameterized.
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+ @RunWith(Parameterized.class)
public class RecoveryManagerMissingHeaderTest
{
private static final String KEYSPACE1 = "RecoveryManager3Test1";
@@@ -43,6 -54,27 +56,29 @@@
private static final String KEYSPACE2 = "RecoveryManager3Test2";
private static final String CF_STANDARD3 = "Standard3";
- public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++ public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- @Before
- public void setUp() throws IOException
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- CommitLog.instance.resetUnsafe(true);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
++ @Before
++ public void setUp() throws IOException
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index 5ac53f6,397030a..37d719e
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -19,40 -19,43 +19,51 @@@
package org.apache.cassandra.db;
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
-import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.assertEquals;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.CommitLogArchiver;
+import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.commitlog.CommitLogReplayer;
+
- @RunWith(OrderedJUnit4ClassRunner.class)
+ @RunWith(Parameterized.class)
public class RecoveryManagerTest
{
private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
@@@ -123,6 -67,6 +75,29 @@@
private static final String KEYSPACE2 = "RecoveryManagerTest2";
private static final String CF_STANDARD3 = "Standard3";
++ public RecoveryManagerTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@@ -139,6 -83,6 +114,7 @@@
@Before
public void clearData()
{
++ // clear data
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
@@@ -151,77 -103,11 +127,78 @@@
}
@Test
- public void testNothingToRecover() throws IOException
+ public void testRecoverBlocksOnBytesOutstanding() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
+ long originalMaxOutstanding = CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = 1;
+ CommitLogReplayer.MutationInitiator originalInitiator = CommitLogReplayer.mutationInitiator;
++ MockInitiator mockInitiator = new MockInitiator();
+ CommitLogReplayer.mutationInitiator = mockInitiator;
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+ UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+ .clustering("col2").add("val", "1")
+ .build());
+
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+ DecoratedKey dk = Util.dk("keymulti");
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).isEmpty());
+ Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
+
+ final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
+ Thread t = new Thread() {
+ @Override
+ public void run()
+ {
+ try
+ {
+ CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
+ }
+ catch (Throwable t)
+ {
+ err.set(t);
+ }
+ }
+ };
+ t.start();
- Assert.assertTrue(blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
++ Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
+ Thread.sleep(100);
+ Assert.assertTrue(t.isAlive());
- blocker.release(Integer.MAX_VALUE);
++ mockInitiator.blocker.release(Integer.MAX_VALUE);
+ t.join(20 * 1000);
+
+ if (err.get() != null)
+ throw new RuntimeException(err.get());
+
+ if (t.isAlive())
+ {
+ Throwable toPrint = new Throwable();
+ toPrint.setStackTrace(Thread.getAllStackTraces().get(t));
+ toPrint.printStackTrace(System.out);
+ }
+ Assert.assertFalse(t.isAlive());
+
+ Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+ Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+ }
+ finally
+ {
+ CommitLogReplayer.mutationInitiator = originalInitiator;
+ CommitLogReplayer.MAX_OUTSTANDING_REPLAY_BYTES = originalMaxOutstanding;
+ }
}
+
@Test
public void testOne() throws IOException
{
@@@ -273,8 -159,8 +250,8 @@@
@Test
public void testRecoverPIT() throws Exception
{
-- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
CommitLog.instance.resetUnsafe(true);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
long timeMS = date.getTime() - 5000;
@@@ -301,8 -187,8 +278,8 @@@
@Test
public void testRecoverPITUnordered() throws Exception
{
-- ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
CommitLog.instance.resetUnsafe(true);
++ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
Date date = CommitLogArchiver.format.parse("2112:12:12 12:12:12");
long timeMS = date.getTime();
@@@ -332,4 -218,4 +309,64 @@@
assertEquals(2, Util.getAll(Util.cmd(cfs).build()).size());
}
++
++ private static class MockInitiator extends CommitLogReplayer.MutationInitiator
++ {
++ final Semaphore blocker = new Semaphore(0);
++ final Semaphore blocked = new Semaphore(0);
++
++ @Override
++ protected Future<Integer> initiateMutation(final Mutation mutation,
++ final long segmentId,
++ final int serializedSize,
++ final int entryLocation,
++ final CommitLogReplayer clr)
++ {
++ final Future<Integer> toWrap = super.initiateMutation(mutation,
++ segmentId,
++ serializedSize,
++ entryLocation,
++ clr);
++ return new Future<Integer>()
++ {
++
++ @Override
++ public boolean cancel(boolean mayInterruptIfRunning)
++ {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public boolean isCancelled()
++ {
++ throw new UnsupportedOperationException();
++ }
++
++ @Override
++ public boolean isDone()
++ {
++ return blocker.availablePermits() > 0 && toWrap.isDone();
++ }
++
++ @Override
++ public Integer get() throws InterruptedException, ExecutionException
++ {
++ System.out.println("Got blocker once");
++ blocked.release();
++ blocker.acquire();
++ return toWrap.get();
++ }
++
++ @Override
++ public Integer get(long timeout, TimeUnit unit)
++ throws InterruptedException, ExecutionException, TimeoutException
++ {
++ blocked.release();
++ blocker.tryAcquire(1, timeout, unit);
++ return toWrap.get(timeout, unit);
++ }
++
++ };
++ }
++ };
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,5a59f1c..738888f
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -19,17 -19,29 +19,31 @@@
package org.apache.cassandra.db;
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
import org.apache.cassandra.schema.KeyspaceParams;
++import org.apache.cassandra.security.EncryptionContext;
++import org.apache.cassandra.security.EncryptionContextGenerator;
+
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* Test for the truncate operation.
@@@ -39,6 -52,27 +54,29 @@@ public class RecoveryManagerTruncateTes
private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
private static final String CF_STANDARD1 = "Standard1";
- public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
++ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
- @Before
- public void setUp() throws IOException
++ @Parameters()
++ public static Collection<Object[]> generateData()
+ {
- CommitLog.instance.resetUnsafe(true);
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
+
- @Parameters()
- public static Collection<Object[]> generateData()
++ @Before
++ public void setUp() throws IOException
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index ab9cb6f,898c19f..fdedafd
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -15,6 -15,6 +15,7 @@@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
++
package org.apache.cassandra.db.commitlog;
import java.io.IOException;
@@@ -117,195 -83,20 +118,195 @@@ public class CommitLogDescriptorTes
@Test
public void testDescriptorInvalidParametersSize() throws IOException
{
- final int numberOfParameters = 65535;
- Map<String, String> params = new HashMap<>(numberOfParameters);
- for (int i=0; i<numberOfParameters; ++i)
+ Map<String, String> params = new HashMap<>();
+ for (int i=0; i<65535; ++i)
params.put("key"+i, Integer.toString(i, 16));
try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
21,
- new ParameterizedClass("LZ4Compressor", params));
+ new ParameterizedClass("LZ4Compressor", params),
+ neverEnabledEncryption);
++
ByteBuffer buf = ByteBuffer.allocate(1024000);
CommitLogDescriptor.writeHeader(buf, desc);
- fail("Parameter object too long should fail on writing descriptor.");
+ Assert.fail("Parameter object too long should fail on writing descriptor.");
} catch (ConfigurationException e)
{
// correct path
}
}
+
+ @Test
+ public void constructParametersString_NoCompressionOrEncryption()
+ {
+ String json = CommitLogDescriptor.constructParametersString(null, null, Collections.emptyMap());
+ Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+
+ json = CommitLogDescriptor.constructParametersString(null, neverEnabledEncryption, Collections.emptyMap());
+ Assert.assertFalse(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertFalse(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+ }
+
+ @Test
+ public void constructParametersString_WithCompressionAndEncryption()
+ {
+ String json = CommitLogDescriptor.constructParametersString(compression, enabledEncryption, Collections.emptyMap());
+ Assert.assertTrue(json.contains(CommitLogDescriptor.COMPRESSION_CLASS_KEY));
+ Assert.assertTrue(json.contains(EncryptionContext.ENCRYPTION_CIPHER));
+ }
+
+ @Test
+ public void writeAndReadHeader_NoCompressionOrEncryption() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertFalse(result.getEncryptionContext().isEnabled());
+ }
+
+ @Test
+ public void writeAndReadHeader_OnlyCompression() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, neverEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(compression, result.compression);
+ Assert.assertFalse(result.getEncryptionContext().isEnabled());
+ }
+
+ @Test
+ public void writeAndReadHeader_WithEncryptionHeader_EncryptionEnabledInYaml() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ /**
+ * Check that even though enabledTdeOptions is disabled in the yaml, we can still read the commit log header as encrypted.
+ */
+ @Test
+ public void writeAndReadHeader_WithEncryptionHeader_EncryptionDisabledInYaml() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, previouslyEnabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertNull(result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ /**
+ * Shouldn't happen in the real world (should only have either compression or enabledTdeOptions), but the header
+ * functionality should be correct
+ */
+ @Test
+ public void writeAndReadHeader_WithCompressionAndEncryption() throws IOException
+ {
+ CommitLogDescriptor descriptor = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ ByteBuffer buffer = ByteBuffer.allocate(16 * 1024);
+ CommitLogDescriptor.writeHeader(buffer, descriptor);
+ buffer.flip();
+ FileSegmentInputStream dataInput = new FileSegmentInputStream(buffer, null, 0);
+ CommitLogDescriptor result = CommitLogDescriptor.readHeader(dataInput, enabledEncryption);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(compression, result.compression);
+ Assert.assertTrue(result.getEncryptionContext().isEnabled());
+ Assert.assertEquals(enabledEncryption, result.getEncryptionContext());
+ Assert.assertArrayEquals(iv, result.getEncryptionContext().getIV());
+ }
+
+ @Test
+ public void equals_NoCompressionOrEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, null);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ @Test
+ public void equals_OnlyCompression()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, null);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ @Test
+ public void equals_OnlyEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, enabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, neverEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+
+ desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+ desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null, previouslyEnabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
+
+ /**
+ * Shouldn't have both enabled in real life, but ensure they are correct, nonetheless
+ */
+ @Test
+ public void equals_BothCompressionAndEncryption()
+ {
+ CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ Assert.assertEquals(desc1, desc1);
+
+ CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, compression, enabledEncryption);
+ Assert.assertEquals(desc1, desc2);
+ }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 1ea0eb1,39ba886..caa9fee
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -26,9 -34,13 +26,12 @@@ import java.util.concurrent.ExecutionEx
import java.util.zip.CRC32;
import java.util.zip.Checksum;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import com.google.common.collect.Iterables;
+
+import org.junit.*;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
@@@ -46,13 -58,11 +52,16 @@@ import org.apache.cassandra.exceptions.
import org.apache.cassandra.io.compress.DeflateCompressor;
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.KeyspaceParams;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.security.EncryptionContext;
+import org.apache.cassandra.security.EncryptionContextGenerator;
- import org.apache.cassandra.utils.*;
++import org.apache.cassandra.utils.Hex;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.KillerForTests;
++import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.vint.VIntCoding;
import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@@ -66,7 -77,26 +76,22 @@@ public class CommitLogTes
private static final String STANDARD1 = "Standard1";
private static final String STANDARD2 = "Standard2";
- String logDirectory;
- public CommitLogTest(ParameterizedClass commitLogCompression)
++ public CommitLogTest(ParameterizedClass commitLogCompression, EncryptionContext encryptionContext)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
- }
-
- @Before
- public void setUp() throws IOException
- {
- CommitLog.instance.resetUnsafe(true);
++ DatabaseDescriptor.setEncryptionContext(encryptionContext);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
- return Arrays.asList(new Object[][] {
- { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ return Arrays.asList(new Object[][]{
++ {null, EncryptionContextGenerator.createDisabledContext()}, // No compression, no encryption
++ {null, EncryptionContextGenerator.createContext(true)}, // Encryption
++ {new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()},
++ {new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()), EncryptionContextGenerator.createDisabledContext()}});
+ }
@BeforeClass
public static void defineSchema() throws ConfigurationException
@@@ -83,13 -113,6 +108,12 @@@
CompactionManager.instance.disableAutoCompaction();
}
+ @Before
+ public void setup() throws IOException
+ {
- logDirectory = DatabaseDescriptor.getCommitLogLocation() + "/unit";
- new File(logDirectory).mkdirs();
++ CommitLog.instance.resetUnsafe(true);
+ }
+
@Test
public void testRecoveryWithEmptyLog() throws Exception
{
@@@ -302,17 -330,25 +322,16 @@@
CommitLog.instance.add(rm);
}
- @Test
+ @Test(expected = IllegalArgumentException.class)
public void testExceedRecordLimit() throws Exception
{
-- CommitLog.instance.resetUnsafe(true);
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
- try
- {
- Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
- .clustering("bytes")
- .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
- .build();
- CommitLog.instance.add(rm);
- throw new AssertionError("mutation larger than limit was accepted");
- }
- catch (IllegalArgumentException e)
- {
- // IAE is thrown on too-large mutations
- }
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(1 + getMaxRecordDataSize()))
+ .build();
+ CommitLog.instance.add(rm);
+ throw new AssertionError("mutation larger than limit was accepted");
}
protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception
@@@ -333,49 -369,10 +352,50 @@@
testRecovery(out.toByteArray(), CommitLogReplayException.class);
}
+ /**
+ * Create a temporary commit log file with an appropriate descriptor at the head.
+ *
+ * @return the commit log file reference and the first position after the descriptor in the file
+ * (so that subsequent writes happen at the correct file location).
+ */
+ protected Pair<File, Integer> tmpFile() throws IOException
+ {
+ EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
+ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.current_version,
+ CommitLogSegment.getNextId(),
+ DatabaseDescriptor.getCommitLogCompression(),
+ encryptionContext);
+
- // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
- Map<String, String> additionalHeaders = new HashMap<>();
- if (encryptionContext.isEnabled())
- {
- byte[] buf = new byte[16];
- new Random().nextBytes(buf);
- additionalHeaders.put(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
- }
+
+ ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc, additionalHeaders);
++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(encryptionContext));
+ buf.flip();
+ int positionAfterHeader = buf.limit() + 1;
+
- File logFile = new File(logDirectory, desc.fileName());
- logFile.deleteOnExit();
++ File logFile = new File(DatabaseDescriptor.getCommitLogLocation(), desc.fileName());
+
+ try (OutputStream lout = new FileOutputStream(logFile))
+ {
+ lout.write(buf.array(), 0, buf.limit());
+ }
+
+ return Pair.create(logFile, positionAfterHeader);
+ }
+
++ private Map<String, String> getAdditionalHeaders(EncryptionContext encryptionContext)
++ {
++ if (!encryptionContext.isEnabled())
++ return Collections.emptyMap();
++
++ // if we're testing encryption, we need to write out a cipher IV to the descriptor headers
++ byte[] buf = new byte[16];
++ new Random().nextBytes(buf);
++ return Collections.singletonMap(EncryptionContext.ENCRYPTION_IV, Hex.bytesToHex(buf));
++ }
++
protected File tmpFile(int version) throws IOException
{
File logFile = File.createTempFile("CommitLog-" + version + "-", ".log");
-- logFile.deleteOnExit();
assert logFile.length() == 0;
return logFile;
}
@@@ -397,9 -394,9 +417,9 @@@
File logFile = tmpFile(desc.version);
CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName());
// Change id to match file.
- desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression);
+ desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression, desc.getEncryptionContext());
ByteBuffer buf = ByteBuffer.allocate(1024);
-- CommitLogDescriptor.writeHeader(buf, desc);
++ CommitLogDescriptor.writeHeader(buf, desc, getAdditionalHeaders(desc.getEncryptionContext()));
try (OutputStream lout = new FileOutputStream(logFile))
{
lout.write(buf.array(), 0, buf.position());
@@@ -440,11 -437,11 +460,8 @@@
protected void runExpecting(Callable<Void> r, Class<?> expected)
{
-- JVMStabilityInspector.Killer originalKiller;
-- KillerForTests killerForTests;
--
-- killerForTests = new KillerForTests();
-- originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
++ KillerForTests killerForTests = new KillerForTests();
++ JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests);
Throwable caught = null;
try
@@@ -466,21 -463,8 +483,23 @@@
protected void testRecovery(final byte[] logData, Class<?> expected) throws Exception
{
++ ParameterizedClass commitLogCompression = DatabaseDescriptor.getCommitLogCompression();
++ EncryptionContext encryptionContext = DatabaseDescriptor.getEncryptionContext();
runExpecting(() -> testRecovery(logData, CommitLogDescriptor.VERSION_20), expected);
- runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null, EncryptionContextGenerator.createDisabledContext()), logData), expected);
- runExpecting(() -> testRecovery(new CommitLogDescriptor(4, null), logData), expected);
++ runExpecting(() -> testRecovery(new CommitLogDescriptor(4, commitLogCompression, encryptionContext), logData), expected);
+ }
+
+ protected void testRecovery(byte[] logData) throws Exception
+ {
+ Pair<File, Integer> pair = tmpFile();
+ try (RandomAccessFile raf = new RandomAccessFile(pair.left, "rw"))
+ {
+ raf.seek(pair.right);
+ raf.write(logData);
+ raf.close();
+
+ CommitLog.instance.recover(pair.left); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/
+ }
}
@Test
@@@ -489,7 -473,7 +508,6 @@@
boolean originalState = DatabaseDescriptor.isAutoSnapshot();
try
{
-- CommitLog.instance.resetUnsafe(true);
boolean prev = DatabaseDescriptor.isAutoSnapshot();
DatabaseDescriptor.setAutoSnapshot(false);
ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
@@@ -549,183 -532,5 +566,103 @@@
DatabaseDescriptor.setAutoSnapshot(originalState);
}
}
+
+ @Test
- public void replay_StandardMmapped() throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog.instance.resetUnsafe(true);
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- @Test
- public void replay_Compressed_LZ4() throws IOException
- {
- replay_Compressed(new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- @Test
- public void replay_Compressed_Snappy() throws IOException
- {
- replay_Compressed(new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- @Test
- public void replay_Compressed_Deflate() throws IOException
- {
- replay_Compressed(new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()));
- }
-
- private void replay_Compressed(ParameterizedClass parameterizedClass) throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(parameterizedClass);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createDisabledContext());
- CommitLog.instance.resetUnsafe(true);
-
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- @Test
- public void replay_Encrypted() throws IOException
- {
- ParameterizedClass originalCompression = DatabaseDescriptor.getCommitLogCompression();
- EncryptionContext originalEncryptionContext = DatabaseDescriptor.getEncryptionContext();
- try
- {
- DatabaseDescriptor.setCommitLogCompression(null);
- DatabaseDescriptor.setEncryptionContext(EncryptionContextGenerator.createContext(true));
- CommitLog.instance.resetUnsafe(true);
-
- replaySimple(CommitLog.instance);
- replayWithDiscard(CommitLog.instance);
- }
- finally
- {
- DatabaseDescriptor.setCommitLogCompression(originalCompression);
- DatabaseDescriptor.setEncryptionContext(originalEncryptionContext);
- CommitLog.instance.resetUnsafe(true);
- }
- }
-
- private void replaySimple(CommitLog commitLog) throws IOException
++ public void replaySimple() throws IOException
+ {
+ int cellCount = 0;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k1")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
- commitLog.add(rm1);
++ CommitLog.instance.add(rm1);
+
+ final Mutation rm2 = new RowUpdateBuilder(cfs.metadata, 0, "k2")
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
+ cellCount += 1;
- commitLog.add(rm2);
++ CommitLog.instance.add(rm2);
+
- commitLog.sync(true);
++ CommitLog.instance.sync(true);
+
- Replayer replayer = new Replayer(commitLog, ReplayPosition.NONE);
- List<String> activeSegments = commitLog.getActiveSegmentNames();
++ Replayer replayer = new Replayer(CommitLog.instance, ReplayPosition.NONE);
++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
- File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
- private void replayWithDiscard(CommitLog commitLog) throws IOException
++ @Test
++ public void replayWithDiscard() throws IOException
+ {
+ int cellCount = 0;
+ int max = 1024;
+ int discardPosition = (int)(max * .8); // an arbitrary number of entries that we'll skip on the replay
+ ReplayPosition replayPosition = null;
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+
+ for (int i = 0; i < max; i++)
+ {
+ final Mutation rm1 = new RowUpdateBuilder(cfs.metadata, 0, "k" + 1)
+ .clustering("bytes")
+ .add("val", bytes("this is a string"))
+ .build();
- ReplayPosition position = commitLog.add(rm1);
++ ReplayPosition position = CommitLog.instance.add(rm1);
+
+ if (i == discardPosition)
+ replayPosition = position;
+ if (i > discardPosition)
+ {
+ cellCount += 1;
+ }
+ }
+
- commitLog.sync(true);
++ CommitLog.instance.sync(true);
+
- Replayer replayer = new Replayer(commitLog, replayPosition);
- List<String> activeSegments = commitLog.getActiveSegmentNames();
++ Replayer replayer = new Replayer(CommitLog.instance, replayPosition);
++ List<String> activeSegments = CommitLog.instance.getActiveSegmentNames();
+ Assert.assertFalse(activeSegments.isEmpty());
+
- File[] files = new File(commitLog.location).listFiles((file, name) -> activeSegments.contains(name));
++ File[] files = new File(CommitLog.instance.location).listFiles((file, name) -> activeSegments.contains(name));
+ replayer.recover(files);
+
+ assertEquals(cellCount, replayer.cells);
+ }
+
+ class Replayer extends CommitLogReplayer
+ {
+ private final ReplayPosition filterPosition;
+ int cells;
+ int skipped;
+
+ Replayer(CommitLog commitLog, ReplayPosition filterPosition)
+ {
+ super(commitLog, filterPosition, Collections.emptyMap(), ReplayFilter.create());
+ this.filterPosition = filterPosition;
+ }
+
+ @SuppressWarnings("resource")
+ void replayMutation(byte[] inputBuffer, int size, final int entryLocation, final CommitLogDescriptor desc) throws IOException
+ {
+ if (entryLocation <= filterPosition.position)
+ {
+ // Skip over this mutation.
+ skipped++;
+ return;
+ }
+
+ FastByteArrayInputStream bufIn = new FastByteArrayInputStream(inputBuffer, 0, size);
+ Mutation mutation = Mutation.serializer.deserialize(new DataInputPlus.DataInputStreamPlus(bufIn), desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates())
+ for (Row row : partitionUpdate)
+ cells += Iterables.size(row.cells());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc6ffc25/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
index 69764e6,3538bd1..c8a6033
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
@@@ -100,12 -97,11 +100,12 @@@ public class CommitLogUpgradeTestMake
public void makeLog() throws IOException, InterruptedException
{
CommitLog commitLog = CommitLog.instance;
- System.out.format("\nUsing commit log size %dmb, compressor %s, sync %s%s\n",
+ System.out.format("\nUsing commit log size: %dmb, compressor: %s, encryption: %s, sync: %s, %s\n",
mb(DatabaseDescriptor.getCommitLogSegmentSize()),
- commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName() : "none",
- commitLog.encryptionContext.isEnabled() ? "enabled" : "none",
+ commitLog.configuration.getCompressorName(),
++ commitLog.configuration.useEncryption(),
commitLog.executor.getClass().getSimpleName(),
- randomSize ? " random size" : "");
+ randomSize ? "random size" : "");
final List<CommitlogExecutor> threads = new ArrayList<>();
ScheduledExecutorService scheduled = startThreads(commitLog, threads);
[4/4] cassandra git commit: Merge branch cassandra-3.7 into trunk
Posted by bl...@apache.org.
Merge branch cassandra-3.7 into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/adfbf518
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/adfbf518
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/adfbf518
Branch: refs/heads/trunk
Commit: adfbf518e041595a481cd902a033e6b081f50f82
Parents: eb5a59a dc6ffc2
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:51:58 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:52:10 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 102 +++++++++-
.../db/commitlog/CommitLogSegment.java | 15 +-
.../db/commitlog/CommitLogSegmentManager.java | 17 +-
.../db/commitlog/CompressedSegment.java | 4 +-
.../db/commitlog/EncryptedSegment.java | 4 +-
.../db/commitlog/CommitLogStressTest.java | 12 +-
.../db/RecoveryManagerFlushedTest.java | 40 ++++
.../db/RecoveryManagerMissingHeaderTest.java | 38 +++-
.../cassandra/db/RecoveryManagerTest.java | 167 ++++++++++-------
.../db/RecoveryManagerTruncateTest.java | 38 ++++
.../db/commitlog/CommitLogDescriptorTest.java | 3 +-
.../cassandra/db/commitlog/CommitLogTest.java | 187 ++++++-------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 4 +-
14 files changed, 407 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/adfbf518/CHANGES.txt
----------------------------------------------------------------------
[2/4] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
Posted by bl...@apache.org.
Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e826951
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e826951
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e826951
Branch: refs/heads/trunk
Commit: 1e82695115c7f191d4de60a92f7b9fd078ebbc68
Parents: 7eb4647 6c445d6
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:36:14 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:44:45 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 71 +++++++++-
.../db/commitlog/CommitLogSegment.java | 7 +-
.../db/commitlog/CommitLogSegmentManager.java | 37 +++---
.../db/commitlog/CompressedSegment.java | 6 +-
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../db/RecoveryManagerFlushedTest.java | 38 +++++-
.../db/RecoveryManagerMissingHeaderTest.java | 36 ++++-
.../cassandra/db/RecoveryManagerTest.java | 47 +++++--
.../db/RecoveryManagerTruncateTest.java | 42 +++++-
.../db/commitlog/CommitLogDescriptorTest.java | 102 ++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 132 +++++++------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 2 +-
13 files changed, 389 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0cafa83,9752d16..70da4ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
-2.2.7
+3.0.7
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
* Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
* Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c0e12c5,460ecfe..dcdd855
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -70,11 -70,10 +70,10 @@@ public class CommitLog implements Commi
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
- final ICompressor compressor;
- public ParameterizedClass compressorClass;
+ volatile Configuration configuration;
final public String location;
- static private CommitLog construct()
+ private static CommitLog construct()
{
CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
@@@ -433,6 -432,14 +431,14 @@@
}
/**
+ * FOR TESTING PURPOSES.
+ */
+ public void resetConfiguration()
+ {
- this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++ configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+ }
+
+ /**
* FOR TESTING PURPOSES. See CommitLogAllocator
*/
public int restartUnsafe() throws IOException
@@@ -487,4 -494,59 +493,59 @@@
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
+
+ public static final class Configuration
+ {
+ /**
+ * The compressor class.
+ */
+ private final ParameterizedClass compressorClass;
+
+ /**
+ * The compressor used to compress the segments.
+ */
+ private final ICompressor compressor;
+
+ public Configuration(ParameterizedClass compressorClass)
+ {
+ this.compressorClass = compressorClass;
- this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
++ this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
+ }
+
+ /**
+ * Checks if the segments must be compressed.
+ * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+ */
+ public boolean useCompression()
+ {
+ return compressor != null;
+ }
+
+ /**
+ * Returns the compressor used to compress the segments.
+ * @return the compressor used to compress the segments
+ */
+ public ICompressor getCompressor()
+ {
+ return compressor;
+ }
+
+ /**
+ * Returns the compressor class.
+ * @return the compressor class
+ */
+ public ParameterizedClass getCompressorClass()
+ {
+ return compressorClass;
+ }
+
+ /**
+ * Returns the compressor name.
+ * @return the compressor name.
+ */
+ public String getCompressorName()
+ {
+ return useCompression() ? compressor.getClass().getSimpleName() : "none";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index aedc9da,ba28f3e..27c05b4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -118,22 -117,12 +118,23 @@@ public abstract class CommitLogSegmen
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog)
+ static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog);
- return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
++ return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
+ : new MemoryMappedSegment(commitLog);
}
+ /**
+ * Checks if the segments use a buffer pool.
+ *
+ * @param commitLog the commit log
+ * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
+ */
+ static boolean usesBufferPool(CommitLog commitLog)
+ {
- return commitLog.compressor != null;
++ return commitLog.configuration.useCompression();
+ }
+
static long getNextId()
{
return idBase + nextId.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 2ee4eed,8670fd7..66ad6a3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@@ -21,11 -21,11 +21,9 @@@ import java.io.File
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
--import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
--import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@@ -34,24 -34,25 +32,26 @@@ import java.util.concurrent.LinkedBlock
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
--import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
--import com.google.common.util.concurrent.*;
-
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.Pair;
--import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
--import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Iterables;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.Runnables;
++import com.google.common.util.concurrent.Uninterruptibles;
/**
* Performs eager-creation of commit log segments in a background thread. All the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 0ec0bca,219709b..c73a30a
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -66,11 -58,10 +66,11 @@@ public class CompressedSegment extends
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog)
+ CompressedSegment(CommitLog commitLog, Runnable onClose)
{
super(commitLog);
- this.compressor = commitLog.compressor;
+ this.compressor = commitLog.configuration.getCompressor();
+ this.onClose = onClose;
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,0000000..d06c112
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -1,95 -1,0 +1,131 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.db;
+
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
++
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
- import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
+
++@RunWith(Parameterized.class)
+public class RecoveryManagerFlushedTest
+{
+ private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
+
+ private static final String KEYSPACE1 = "RecoveryManager2Test";
+ private static final String CF_STANDARD1 = "Standard1";
+ private static final String CF_STANDARD2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
+ }
+
++ public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][] {
++ { null }, // No compression
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ }
++
+ @Test
+ /* test that commit logs do not replay flushed data */
+ public void testWithFlush() throws Exception
+ {
+ // Flush everything that may be in the commit log now to start fresh
+ FBUtilities.waitOnFutures(Keyspace.open(SystemKeyspace.NAME).flush());
+ FBUtilities.waitOnFutures(Keyspace.open(SchemaKeyspace.NAME).flush());
+
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ // add a row to another CF so we test skipping mutations within a not-entirely-flushed CF
+ insertRow("Standard2", "key");
+
+ for (int i = 0; i < 100; i++)
+ {
+ String key = "key" + i;
+ insertRow("Standard1", key);
+ }
+
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace1.getColumnFamilyStore("Standard1");
+ logger.debug("forcing flush");
+ cfs.forceBlockingFlush();
+
+ logger.debug("begin manual replay");
+ // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
+ // will be replayed)
+ int replayed = CommitLog.instance.resetUnsafe(false);
+ assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
+ }
+
+ private void insertRow(String cfname, String key)
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ new RowUpdateBuilder(cfs.metadata, 0, key)
+ .clustering("c")
+ .add("val", "val1")
+ .build()
+ .apply();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,0000000..8ac7c5d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -1,88 -1,0 +1,120 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
+
+import org.junit.Assert;
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
- import org.apache.cassandra.db.rows.UnfilteredRowIterator;
++import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceParams;
+
++@RunWith(Parameterized.class)
+public class RecoveryManagerMissingHeaderTest
+{
+ private static final String KEYSPACE1 = "RecoveryManager3Test1";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ private static final String KEYSPACE2 = "RecoveryManager3Test2";
+ private static final String CF_STANDARD3 = "Standard3";
+
++ public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][] {
++ { null }, // No compression
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ }
++
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ SchemaLoader.createKeyspace(KEYSPACE2,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
+ }
+
+ @Test
+ public void testMissingHeader() throws IOException
+ {
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+ DecoratedKey dk = Util.dk("keymulti");
+ UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+ // nuke the header
+ for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ {
+ if (file.getName().endsWith(".header"))
+ FileUtils.deleteWithConfirm(file);
+ }
+
+ CommitLog.instance.resetUnsafe(false);
+
+ Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+ Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466,5676b99..397030a
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -22,35 -25,34 +25,41 @@@ import java.util.Collections
import java.util.Date;
import java.util.concurrent.TimeUnit;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
- import org.apache.cassandra.Util;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.context.CounterContext;
- import org.apache.cassandra.exceptions.ConfigurationException;
-
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
- import static org.junit.Assert.assertEquals;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
-import org.apache.cassandra.db.marshal.CounterColumnType;
++import org.apache.cassandra.db.context.CounterContext;
++import org.apache.cassandra.db.rows.Row;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
- @RunWith(OrderedJUnit4ClassRunner.class)
-import static org.apache.cassandra.Util.cellname;
-import static org.apache.cassandra.Util.column;
-import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++import static org.junit.Assert.assertEquals;
+
+ @RunWith(Parameterized.class)
public class RecoveryManagerTest
{
+ private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
+
private static final String KEYSPACE1 = "RecoveryManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_COUNTER1 = "Counter1";
@@@ -71,13 -75,20 +80,27 @@@
SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
}
+ @Before
+ public void clearData()
+ {
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
+ Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
+ }
+ public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
@Test
public void testNothingToRecover() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,769316f..5a59f1c
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -18,18 -18,33 +18,30 @@@
*/
package org.apache.cassandra.db;
-import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.*;
-
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.schema.KeyspaceParams;
+
- import org.junit.BeforeClass;
- import org.junit.Test;
-
- import static org.junit.Assert.*;
++import static org.junit.Assert.assertTrue;
/**
* Test for the truncate operation.
@@@ -38,7 -54,29 +51,28 @@@ public class RecoveryManagerTruncateTes
{
private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
private static final String CF_STANDARD1 = "Standard1";
- private static final String CF_STANDARD2 = "Standard2";
+ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 0000000,8d63959..898c19f
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -1,0 -1,103 +1,102 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import com.google.common.collect.ImmutableMap;
+
+ import org.junit.Test;
+
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
++import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+
+ public class CommitLogDescriptorTest
+ {
+ @Test
+ public void testVersions()
+ {
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+ assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+ assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+ String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+ assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+ }
+
+ private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+ {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
- long length = buf.position();
+ // Put some extra data in the stream.
+ buf.putDouble(0.1);
+ buf.flip();
- try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
++
++ try (DataInputBuffer input = new DataInputBuffer(buf, false))
+ {
+ CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- assertEquals("Descriptor length", length, input.getFilePointer());
+ assertEquals("Descriptors", desc, read);
+ }
+ }
+
+ @Test
+ public void testDescriptorPersistence() throws IOException
+ {
+ testDescriptorPersistence(new CommitLogDescriptor(11, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+ new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+ }
+
+ @Test
+ public void testDescriptorInvalidParametersSize() throws IOException
+ {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<6000; ++i)
++ final int numberOfParameters = 65535;
++ Map<String, String> params = new HashMap<>(numberOfParameters);
++ for (int i=0; i<numberOfParameters; ++i)
+ params.put("key"+i, Integer.toString(i, 16));
+ try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
++ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+ 21,
+ new ParameterizedClass("LZ4Compressor", params));
+ ByteBuffer buf = ByteBuffer.allocate(1024000);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ fail("Parameter object too long should fail on writing descriptor.");
+ } catch (ConfigurationException e)
+ {
+ // correct path
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 555cdda,9999b42..39ba886
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -16,23 -16,15 +16,20 @@@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.cassandra.db.commitlog;
- import static junit.framework.Assert.assertTrue;
- import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
- import static org.junit.Assert.assertEquals;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
- import java.util.HashMap;
- import java.util.Map;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@@ -50,31 -39,54 +44,60 @@@ import org.junit.runners.Parameterized.
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
- import org.apache.cassandra.config.Config.CommitFailurePolicy;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.ColumnFamilyStore;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.utils.vint.VIntCoding;
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
+
+ @RunWith(Parameterized.class)
public class CommitLogTest
{
private static final String KEYSPACE1 = "CommitLogTest";
private static final String KEYSPACE2 = "CommitLogTestNonDurable";
- private static final String CF1 = "Standard1";
- private static final String CF2 = "Standard2";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+ public CommitLogTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@@ -186,29 -208,21 +209,28 @@@
@Test
public void testDontDeleteIfDirty() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
// Roughly 32 MB mutation
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
+ Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
+ .build();
// Adding it 5 times
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
// Adding new mutation on another CF
- Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
- rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
- CommitLog.instance.add(rm2);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(4))
+ .build();
+ CommitLog.instance.add(m2);
assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
@@@ -222,16 -236,10 +244,14 @@@
@Test
public void testDeleteIfNotDirty() throws Exception
{
-- DatabaseDescriptor.getCommitLogSegmentSize();
- CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
// Roughly 32 MB mutation
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
+ Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
+ .build();
// Adding it twice (won't change segment)
CommitLog.instance.add(rm);
@@@ -302,12 -294,8 +322,12 @@@
@Test
public void testEqualRecordLimit() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
+ .build();
++
CommitLog.instance.add(rm);
}
@@@ -448,129 -432,57 +468,69 @@@
}
@Test
- public void testVersions()
- {
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
- assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
- assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
- String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
- assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
- }
-
- @Test
public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
{
- boolean prev = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
- ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
-
- final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
- rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
- rm1.apply();
- cfs1.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prev);
- final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
- rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
-
- for (int i = 0 ; i < 5 ; i++)
- CommitLog.instance.add(rm2);
-
- Assert.assertEquals(2, CommitLog.instance.activeSegments());
- ReplayPosition position = CommitLog.instance.getContext();
- for (Keyspace ks : Keyspace.system())
- for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
- CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
- Assert.assertEquals(1, CommitLog.instance.activeSegments());
+ boolean originalState = DatabaseDescriptor.isAutoSnapshot();
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ boolean prev = DatabaseDescriptor.isAutoSnapshot();
+ DatabaseDescriptor.setAutoSnapshot(false);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
+ cfs1.truncateBlocking();
+ DatabaseDescriptor.setAutoSnapshot(prev);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
+ .build();
+
+ for (int i = 0 ; i < 5 ; i++)
+ CommitLog.instance.add(m2);
+
+ assertEquals(2, CommitLog.instance.activeSegments());
+ ReplayPosition position = CommitLog.instance.getContext();
+ for (Keyspace ks : Keyspace.system())
+ for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+ CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+ assertEquals(1, CommitLog.instance.activeSegments());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
}
@Test
public void testTruncateWithoutSnapshotNonDurable() throws IOException
{
- CommitLog.instance.resetUnsafe(true);
- boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
- Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
- ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
- CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
- Mutation rm;
- DecoratedKey dk = Util.dk("key1");
-
- // add data
- rm = new Mutation(KEYSPACE2, dk.getKey());
- rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
- rm.apply();
-
- ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
- Row row = command.getRow(notDurableKs);
- Cell col = row.cf.getColumn(Util.cellname("Column1"));
- Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
- cfs.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
- row = command.getRow(notDurableKs);
- Assert.assertEquals(null, row.cf);
+ boolean originalState = DatabaseDescriptor.getAutoSnapshot();
+ try
+ {
+ DatabaseDescriptor.setAutoSnapshot(false);
+ Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
+ Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+
+ ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
+ new RowUpdateBuilder(cfs.metadata, 0, "key1")
+ .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .applyUnsafe();
+
+ assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
+ .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+
+ cfs.truncateBlocking();
+
+ Util.assertEmpty(Util.cmd(cfs).columns("val").build());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
}
-
- private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
- {
- ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc);
- // Put some extra data in the stream.
- buf.putDouble(0.1);
- buf.flip();
-
- DataInputBuffer input = new DataInputBuffer(buf, false);
- CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- Assert.assertEquals("Descriptors", desc, read);
- }
-
- @Test
- public void testDescriptorPersistence() throws IOException
- {
- testDescriptorPersistence(new CommitLogDescriptor(11, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
- new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
- }
-
- @Test
- public void testDescriptorInvalidParametersSize() throws IOException
- {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<65535; ++i)
- params.put("key"+i, Integer.toString(i, 16));
- try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
- 21,
- new ParameterizedClass("LZ4Compressor", params));
- ByteBuffer buf = ByteBuffer.allocate(1024000);
- CommitLogDescriptor.writeHeader(buf, desc);
- Assert.fail("Parameter object too long should fail on writing descriptor.");
- } catch (ConfigurationException e)
- {
- // correct path
- }
- }
}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------