You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2016/06/02 10:50:54 UTC
[2/3] cassandra git commit: Merge branch cassandra-2.2 into
cassandra-3.0
Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e826951
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e826951
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e826951
Branch: refs/heads/cassandra-3.7
Commit: 1e82695115c7f191d4de60a92f7b9fd078ebbc68
Parents: 7eb4647 6c445d6
Author: Benjamin Lerer <b....@gmail.com>
Authored: Thu Jun 2 12:36:14 2016 +0200
Committer: Benjamin Lerer <b....@gmail.com>
Committed: Thu Jun 2 12:44:45 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/db/commitlog/CommitLog.java | 71 +++++++++-
.../db/commitlog/CommitLogSegment.java | 7 +-
.../db/commitlog/CommitLogSegmentManager.java | 37 +++---
.../db/commitlog/CompressedSegment.java | 6 +-
.../db/commitlog/CommitLogStressTest.java | 2 +-
.../db/RecoveryManagerFlushedTest.java | 38 +++++-
.../db/RecoveryManagerMissingHeaderTest.java | 36 ++++-
.../cassandra/db/RecoveryManagerTest.java | 47 +++++--
.../db/RecoveryManagerTruncateTest.java | 42 +++++-
.../db/commitlog/CommitLogDescriptorTest.java | 102 ++++++++++++++
.../cassandra/db/commitlog/CommitLogTest.java | 132 +++++++------------
.../db/commitlog/CommitLogUpgradeTestMaker.java | 2 +-
13 files changed, 389 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0cafa83,9752d16..70da4ad
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,5 +1,19 @@@
-2.2.7
+3.0.7
+ * Fix Directories instantiations where CFS.initialDirectories should be used (CASSANDRA-11849)
+ * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
+ * Fix sstables not being protected from removal during index build (CASSANDRA-11905)
+ * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
+ * Remove unneeded code to repair index summaries that have
+ been improperly down-sampled (CASSANDRA-11127)
+ * Avoid WriteTimeoutExceptions during commit log replay due to materialized
+ view lock contention (CASSANDRA-11891)
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
+ * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
+ * Allow compaction strategies to disable early open (CASSANDRA-11754)
+ * Refactor Materialized View code (CASSANDRA-11475)
+ * Update Java Driver (CASSANDRA-11615)
+Merged from 2.2:
+ * Run CommitLog tests with different compression settings (CASSANDRA-9039)
* cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
* Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
* Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index c0e12c5,460ecfe..dcdd855
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@@ -70,11 -70,10 +70,10 @@@ public class CommitLog implements Commi
final CommitLogMetrics metrics;
final AbstractCommitLogService executor;
- final ICompressor compressor;
- public ParameterizedClass compressorClass;
+ volatile Configuration configuration;
final public String location;
- static private CommitLog construct()
+ private static CommitLog construct()
{
CommitLog log = new CommitLog(DatabaseDescriptor.getCommitLogLocation(), CommitLogArchiver.construct());
@@@ -433,6 -432,14 +431,14 @@@
}
/**
+ * FOR TESTING PURPOSES.
+ */
+ public void resetConfiguration()
+ {
- this.configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
++ configuration = new Configuration(DatabaseDescriptor.getCommitLogCompression());
+ }
+
+ /**
* FOR TESTING PURPOSES. See CommitLogAllocator
*/
public int restartUnsafe() throws IOException
@@@ -487,4 -494,59 +493,59 @@@
throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
}
}
+
+ public static final class Configuration
+ {
+ /**
+ * The compressor class.
+ */
+ private final ParameterizedClass compressorClass;
+
+ /**
+ * The compressor used to compress the segments.
+ */
+ private final ICompressor compressor;
+
+ public Configuration(ParameterizedClass compressorClass)
+ {
+ this.compressorClass = compressorClass;
- this.compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null;
++ this.compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null;
+ }
+
+ /**
+ * Checks if the segments must be compressed.
+ * @return <code>true</code> if the segments must be compressed, <code>false</code> otherwise.
+ */
+ public boolean useCompression()
+ {
+ return compressor != null;
+ }
+
+ /**
+ * Returns the compressor used to compress the segments.
+ * @return the compressor used to compress the segments
+ */
+ public ICompressor getCompressor()
+ {
+ return compressor;
+ }
+
+ /**
+ * Returns the compressor class.
+ * @return the compressor class
+ */
+ public ParameterizedClass getCompressorClass()
+ {
+ return compressorClass;
+ }
+
+ /**
+ * Returns the compressor name.
+ * @return the compressor name.
+ */
+ public String getCompressorName()
+ {
+ return useCompression() ? compressor.getClass().getSimpleName() : "none";
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index aedc9da,ba28f3e..27c05b4
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@@ -118,22 -117,12 +118,23 @@@ public abstract class CommitLogSegmen
final CommitLog commitLog;
public final CommitLogDescriptor descriptor;
- static CommitLogSegment createSegment(CommitLog commitLog)
+ static CommitLogSegment createSegment(CommitLog commitLog, Runnable onClose)
{
- return commitLog.compressor != null ? new CompressedSegment(commitLog, onClose) : new MemoryMappedSegment(commitLog);
- return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog)
++ return commitLog.configuration.useCompression() ? new CompressedSegment(commitLog, onClose)
+ : new MemoryMappedSegment(commitLog);
}
+ /**
+ * Checks if the segments use a buffer pool.
+ *
+ * @param commitLog the commit log
+ * @return <code>true</code> if the segments use a buffer pool, <code>false</code> otherwise.
+ */
+ static boolean usesBufferPool(CommitLog commitLog)
+ {
- return commitLog.compressor != null;
++ return commitLog.configuration.useCompression();
+ }
+
static long getNextId()
{
return idBase + nextId.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 2ee4eed,8670fd7..66ad6a3
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@@ -21,11 -21,11 +21,9 @@@ import java.io.File
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
--import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
--import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@@ -34,24 -34,25 +32,26 @@@ import java.util.concurrent.LinkedBlock
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
--import com.google.common.annotations.VisibleForTesting;
--import com.google.common.collect.Iterables;
--import com.google.common.util.concurrent.*;
-
--import org.slf4j.Logger;
--import org.slf4j.LoggerFactory;
--
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import org.apache.cassandra.io.util.FileUtils;
--import org.apache.cassandra.utils.Pair;
--import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.apache.cassandra.utils.JVMStabilityInspector;
++import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
++import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
--import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
++import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Iterables;
++import com.google.common.util.concurrent.Futures;
++import com.google.common.util.concurrent.ListenableFuture;
++import com.google.common.util.concurrent.Runnables;
++import com.google.common.util.concurrent.Uninterruptibles;
/**
* Performs eager-creation of commit log segments in a background thread. All the
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index 0ec0bca,219709b..c73a30a
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@@ -66,11 -58,10 +66,11 @@@ public class CompressedSegment extends
/**
* Constructs a new segment file.
*/
- CompressedSegment(CommitLog commitLog)
+ CompressedSegment(CommitLog commitLog, Runnable onClose)
{
super(commitLog);
- this.compressor = commitLog.compressor;
+ this.compressor = commitLog.configuration.getCompressor();
+ this.onClose = onClose;
try
{
channel.write((ByteBuffer) buffer.duplicate().flip());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
index e24af0f,0000000..d06c112
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerFlushedTest.java
@@@ -1,95 -1,0 +1,131 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.db;
+
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
++
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.SchemaLoader;
- import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.utils.FBUtilities;
+
++@RunWith(Parameterized.class)
+public class RecoveryManagerFlushedTest
+{
+ private static Logger logger = LoggerFactory.getLogger(RecoveryManagerFlushedTest.class);
+
+ private static final String KEYSPACE1 = "RecoveryManager2Test";
+ private static final String CF_STANDARD1 = "Standard1";
+ private static final String CF_STANDARD2 = "Standard2";
+
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
+ }
+
++ public RecoveryManagerFlushedTest(ParameterizedClass commitLogCompression)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][] {
++ { null }, // No compression
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ }
++
+ @Test
+ /* test that commit logs do not replay flushed data */
+ public void testWithFlush() throws Exception
+ {
+ // Flush everything that may be in the commit log now to start fresh
+ FBUtilities.waitOnFutures(Keyspace.open(SystemKeyspace.NAME).flush());
+ FBUtilities.waitOnFutures(Keyspace.open(SchemaKeyspace.NAME).flush());
+
+
+ CompactionManager.instance.disableAutoCompaction();
+
+ // add a row to another CF so we test skipping mutations within a not-entirely-flushed CF
+ insertRow("Standard2", "key");
+
+ for (int i = 0; i < 100; i++)
+ {
+ String key = "key" + i;
+ insertRow("Standard1", key);
+ }
+
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace1.getColumnFamilyStore("Standard1");
+ logger.debug("forcing flush");
+ cfs.forceBlockingFlush();
+
+ logger.debug("begin manual replay");
+ // replay the commit log (nothing on Standard1 should be replayed since everything was flushed, so only the row on Standard2
+ // will be replayed)
+ int replayed = CommitLog.instance.resetUnsafe(false);
+ assert replayed == 1 : "Expecting only 1 replayed mutation, got " + replayed;
+ }
+
+ private void insertRow(String cfname, String key)
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ new RowUpdateBuilder(cfs.metadata, 0, key)
+ .clustering("c")
+ .add("val", "val1")
+ .build()
+ .apply();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
index 9275dae,0000000..8ac7c5d
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerMissingHeaderTest.java
@@@ -1,88 -1,0 +1,120 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
++import java.util.Arrays;
++import java.util.Collection;
++import java.util.Collections;
+
+import org.junit.Assert;
++import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator;
- import org.apache.cassandra.db.rows.UnfilteredRowIterator;
++import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.exceptions.ConfigurationException;
++import org.apache.cassandra.io.compress.DeflateCompressor;
++import org.apache.cassandra.io.compress.LZ4Compressor;
++import org.apache.cassandra.io.compress.SnappyCompressor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.schema.KeyspaceParams;
+
++@RunWith(Parameterized.class)
+public class RecoveryManagerMissingHeaderTest
+{
+ private static final String KEYSPACE1 = "RecoveryManager3Test1";
+ private static final String CF_STANDARD1 = "Standard1";
+
+ private static final String KEYSPACE2 = "RecoveryManager3Test2";
+ private static final String CF_STANDARD3 = "Standard3";
+
++ public RecoveryManagerMissingHeaderTest(ParameterizedClass commitLogCompression)
++ {
++ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
++ }
++
++ @Before
++ public void setUp() throws IOException
++ {
++ CommitLog.instance.resetUnsafe(true);
++ }
++
++ @Parameters()
++ public static Collection<Object[]> generateData()
++ {
++ return Arrays.asList(new Object[][] {
++ { null }, // No compression
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
++ }
++
+ @BeforeClass
+ public static void defineSchema() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace(KEYSPACE1,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ SchemaLoader.createKeyspace(KEYSPACE2,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
+ }
+
+ @Test
+ public void testMissingHeader() throws IOException
+ {
+ Keyspace keyspace1 = Keyspace.open(KEYSPACE1);
+ Keyspace keyspace2 = Keyspace.open(KEYSPACE2);
+
+ DecoratedKey dk = Util.dk("keymulti");
+ UnfilteredRowIterator upd1 = Util.apply(new RowUpdateBuilder(keyspace1.getColumnFamilyStore(CF_STANDARD1).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ UnfilteredRowIterator upd2 = Util.apply(new RowUpdateBuilder(keyspace2.getColumnFamilyStore(CF_STANDARD3).metadata, 1L, 0, "keymulti")
+ .clustering("col1").add("val", "1")
+ .build());
+
+ keyspace1.getColumnFamilyStore("Standard1").clearUnsafe();
+ keyspace2.getColumnFamilyStore("Standard3").clearUnsafe();
+
+ // nuke the header
+ for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles())
+ {
+ if (file.getName().endsWith(".header"))
+ FileUtils.deleteWithConfirm(file);
+ }
+
+ CommitLog.instance.resetUnsafe(false);
+
+ Assert.assertTrue(Util.equal(upd1, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace1.getColumnFamilyStore(CF_STANDARD1), dk).build()).unfilteredIterator()));
+ Assert.assertTrue(Util.equal(upd2, Util.getOnlyPartitionUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).unfilteredIterator()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index baf9466,5676b99..397030a
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@@ -22,35 -25,34 +25,41 @@@ import java.util.Collections
import java.util.Date;
import java.util.concurrent.TimeUnit;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import org.apache.cassandra.OrderedJUnit4ClassRunner;
- import org.apache.cassandra.Util;
- import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.db.rows.*;
- import org.apache.cassandra.db.context.CounterContext;
- import org.apache.cassandra.exceptions.ConfigurationException;
-
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
- import static org.junit.Assert.assertEquals;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.Util;
++import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogArchiver;
-import org.apache.cassandra.db.marshal.CounterColumnType;
++import org.apache.cassandra.db.context.CounterContext;
++import org.apache.cassandra.db.rows.Row;
++import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
- @RunWith(OrderedJUnit4ClassRunner.class)
-import static org.apache.cassandra.Util.cellname;
-import static org.apache.cassandra.Util.column;
-import static org.apache.cassandra.db.KeyspaceTest.assertColumns;
++import static org.junit.Assert.assertEquals;
+
+ @RunWith(Parameterized.class)
public class RecoveryManagerTest
{
+ private static Logger logger = LoggerFactory.getLogger(RecoveryManagerTest.class);
+
private static final String KEYSPACE1 = "RecoveryManagerTest1";
private static final String CF_STANDARD1 = "Standard1";
private static final String CF_COUNTER1 = "Counter1";
@@@ -71,13 -75,20 +80,27 @@@
SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD3));
}
+ @Before
+ public void clearData()
+ {
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlocking();
+ Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_COUNTER1).truncateBlocking();
+ Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD3).truncateBlocking();
+ }
+ public RecoveryManagerTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
@Test
public void testNothingToRecover() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
index 7c8ab7d,769316f..5a59f1c
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
@@@ -18,18 -18,33 +18,30 @@@
*/
package org.apache.cassandra.db;
-import static org.apache.cassandra.Util.column;
-import static org.junit.Assert.*;
-
import java.io.IOException;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+
+ import org.junit.Before;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.Parameterized;
+ import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+ import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
+ import org.apache.cassandra.config.ParameterizedClass;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.exceptions.ConfigurationException;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.schema.KeyspaceParams;
+
- import org.junit.BeforeClass;
- import org.junit.Test;
-
- import static org.junit.Assert.*;
++import static org.junit.Assert.assertTrue;
/**
* Test for the truncate operation.
@@@ -38,7 -54,29 +51,28 @@@ public class RecoveryManagerTruncateTes
{
private static final String KEYSPACE1 = "RecoveryManagerTruncateTest";
private static final String CF_STANDARD1 = "Standard1";
- private static final String CF_STANDARD2 = "Standard2";
+ public RecoveryManagerTruncateTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
index 0000000,8d63959..898c19f
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogDescriptorTest.java
@@@ -1,0 -1,103 +1,102 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.db.commitlog;
+
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import com.google.common.collect.ImmutableMap;
+
+ import org.junit.Test;
+
+ import org.apache.cassandra.config.ParameterizedClass;
+ import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
-import org.apache.cassandra.io.util.FileDataInput;
++import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.net.MessagingService;
+
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertTrue;
+ import static org.junit.Assert.fail;
+
+ public class CommitLogDescriptorTest
+ {
+ @Test
+ public void testVersions()
+ {
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
+ assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
+ assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
+
+ assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
+
+ assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
+ String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
+ assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
+ }
+
+ private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
+ {
+ ByteBuffer buf = ByteBuffer.allocate(1024);
+ CommitLogDescriptor.writeHeader(buf, desc);
- long length = buf.position();
+ // Put some extra data in the stream.
+ buf.putDouble(0.1);
+ buf.flip();
- try (FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0))
++
++ try (DataInputBuffer input = new DataInputBuffer(buf, false))
+ {
+ CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- assertEquals("Descriptor length", length, input.getFilePointer());
+ assertEquals("Descriptors", desc, read);
+ }
+ }
+
+ @Test
+ public void testDescriptorPersistence() throws IOException
+ {
+ testDescriptorPersistence(new CommitLogDescriptor(11, null));
+ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19,
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
++ testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
+ new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
+ }
+
+ @Test
+ public void testDescriptorInvalidParametersSize() throws IOException
+ {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<6000; ++i)
++ final int numberOfParameters = 65535;
++ Map<String, String> params = new HashMap<>(numberOfParameters);
++ for (int i=0; i<numberOfParameters; ++i)
+ params.put("key"+i, Integer.toString(i, 16));
+ try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22,
++ CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
+ 21,
+ new ParameterizedClass("LZ4Compressor", params));
+ ByteBuffer buf = ByteBuffer.allocate(1024000);
+ CommitLogDescriptor.writeHeader(buf, desc);
+ fail("Parameter object too long should fail on writing descriptor.");
+ } catch (ConfigurationException e)
+ {
+ // correct path
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 555cdda,9999b42..39ba886
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -16,23 -16,15 +16,20 @@@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.cassandra.db.commitlog;
- import static junit.framework.Assert.assertTrue;
- import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
- import static org.junit.Assert.assertEquals;
-
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
- import java.util.HashMap;
- import java.util.Map;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
@@@ -50,31 -39,54 +44,60 @@@ import org.junit.runners.Parameterized.
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
- import org.apache.cassandra.config.Config.CommitFailurePolicy;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.config.ParameterizedClass;
- import org.apache.cassandra.db.commitlog.CommitLog;
- import org.apache.cassandra.db.commitlog.CommitLogDescriptor;
- import org.apache.cassandra.db.commitlog.ReplayPosition;
- import org.apache.cassandra.db.commitlog.CommitLogSegment;
-import org.apache.cassandra.db.*;
++import org.apache.cassandra.db.ColumnFamilyStore;
++import org.apache.cassandra.db.Keyspace;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
++import org.apache.cassandra.db.marshal.AsciiType;
++import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.io.util.DataInputBuffer;
+ import org.apache.cassandra.io.compress.DeflateCompressor;
+ import org.apache.cassandra.io.compress.LZ4Compressor;
+ import org.apache.cassandra.io.compress.SnappyCompressor;
-import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.KillerForTests;
+import org.apache.cassandra.utils.vint.VIntCoding;
+ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
++import static org.junit.Assert.assertEquals;
++import static org.junit.Assert.assertTrue;
+
+ @RunWith(Parameterized.class)
public class CommitLogTest
{
private static final String KEYSPACE1 = "CommitLogTest";
private static final String KEYSPACE2 = "CommitLogTestNonDurable";
- private static final String CF1 = "Standard1";
- private static final String CF2 = "Standard2";
+ private static final String STANDARD1 = "Standard1";
+ private static final String STANDARD2 = "Standard2";
+ public CommitLogTest(ParameterizedClass commitLogCompression)
+ {
+ DatabaseDescriptor.setCommitLogCompression(commitLogCompression);
+ }
+
+ @Before
+ public void setUp() throws IOException
+ {
+ CommitLog.instance.resetUnsafe(true);
+ }
+
+ @Parameters()
+ public static Collection<Object[]> generateData()
+ {
+ return Arrays.asList(new Object[][] {
+ { null }, // No compression
- { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.<String, String>emptyMap()) },
- { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.<String, String>emptyMap()) } });
++ { new ParameterizedClass(LZ4Compressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(SnappyCompressor.class.getName(), Collections.emptyMap()) },
++ { new ParameterizedClass(DeflateCompressor.class.getName(), Collections.emptyMap()) } });
+ }
+
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
@@@ -186,29 -208,21 +209,28 @@@
@Test
public void testDontDeleteIfDirty() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
// Roughly 32 MB mutation
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0);
+ Mutation m = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4))
+ .build();
// Adding it 5 times
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
- CommitLog.instance.add(rm);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
+ CommitLog.instance.add(m);
// Adding new mutation on another CF
- Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
- rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0);
- CommitLog.instance.add(rm2);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(4))
+ .build();
+ CommitLog.instance.add(m2);
assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments();
@@@ -222,16 -236,10 +244,14 @@@
@Test
public void testDeleteIfNotDirty() throws Exception
{
-- DatabaseDescriptor.getCommitLogSegmentSize();
- CommitLog.instance.resetUnsafe(true);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
// Roughly 32 MB mutation
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0);
+ Mutation rm = new RowUpdateBuilder(cfs1.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1))
+ .build();
// Adding it twice (won't change segment)
CommitLog.instance.add(rm);
@@@ -302,12 -294,8 +322,12 @@@
@Test
public void testEqualRecordLimit() throws Exception
{
- CommitLog.instance.resetUnsafe(true);
- Mutation rm = new Mutation(KEYSPACE1, bytes("k"));
- rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0);
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ Mutation rm = new RowUpdateBuilder(cfs.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(getMaxRecordDataSize()))
+ .build();
++
CommitLog.instance.add(rm);
}
@@@ -448,129 -432,57 +468,69 @@@
}
@Test
- public void testVersions()
- {
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log"));
- Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log"));
- Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log"));
-
- assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id);
-
- assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion());
- String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log";
- assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion());
- }
-
- @Test
public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException
{
- boolean prev = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1");
- ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2");
-
- final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k"));
- rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0);
- rm1.apply();
- cfs1.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prev);
- final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k"));
- rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0);
-
- for (int i = 0 ; i < 5 ; i++)
- CommitLog.instance.add(rm2);
-
- Assert.assertEquals(2, CommitLog.instance.activeSegments());
- ReplayPosition position = CommitLog.instance.getContext();
- for (Keyspace ks : Keyspace.system())
- for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
- CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
- CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
- Assert.assertEquals(1, CommitLog.instance.activeSegments());
+ boolean originalState = DatabaseDescriptor.isAutoSnapshot();
+ try
+ {
+ CommitLog.instance.resetUnsafe(true);
+ boolean prev = DatabaseDescriptor.isAutoSnapshot();
+ DatabaseDescriptor.setAutoSnapshot(false);
+ ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD1);
+ ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore(STANDARD2);
+
+ new RowUpdateBuilder(cfs1.metadata, 0, "k").clustering("bytes").add("val", ByteBuffer.allocate(100)).build().applyUnsafe();
+ cfs1.truncateBlocking();
+ DatabaseDescriptor.setAutoSnapshot(prev);
+ Mutation m2 = new RowUpdateBuilder(cfs2.metadata, 0, "k")
+ .clustering("bytes")
+ .add("val", ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4))
+ .build();
+
+ for (int i = 0 ; i < 5 ; i++)
+ CommitLog.instance.add(m2);
+
+ assertEquals(2, CommitLog.instance.activeSegments());
+ ReplayPosition position = CommitLog.instance.getContext();
+ for (Keyspace ks : Keyspace.system())
+ for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores())
+ CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position);
+ CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position);
+ assertEquals(1, CommitLog.instance.activeSegments());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
}
@Test
public void testTruncateWithoutSnapshotNonDurable() throws IOException
{
- CommitLog.instance.resetUnsafe(true);
- boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot();
- DatabaseDescriptor.setAutoSnapshot(false);
- Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
- Assert.assertFalse(notDurableKs.getMetadata().durableWrites);
- ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
- CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator();
- Mutation rm;
- DecoratedKey dk = Util.dk("key1");
-
- // add data
- rm = new Mutation(KEYSPACE2, dk.getKey());
- rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0);
- rm.apply();
-
- ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type)));
- Row row = command.getRow(notDurableKs);
- Cell col = row.cf.getColumn(Util.cellname("Column1"));
- Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes()));
- cfs.truncateBlocking();
- DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot);
- row = command.getRow(notDurableKs);
- Assert.assertEquals(null, row.cf);
+ boolean originalState = DatabaseDescriptor.getAutoSnapshot();
+ try
+ {
+ DatabaseDescriptor.setAutoSnapshot(false);
+ Keyspace notDurableKs = Keyspace.open(KEYSPACE2);
+ Assert.assertFalse(notDurableKs.getMetadata().params.durableWrites);
+
+ ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1");
+ new RowUpdateBuilder(cfs.metadata, 0, "key1")
+ .clustering("bytes").add("val", ByteBufferUtil.bytes("abcd"))
+ .build()
+ .applyUnsafe();
+
+ assertTrue(Util.getOnlyRow(Util.cmd(cfs).columns("val").build())
+ .cells().iterator().next().value().equals(ByteBufferUtil.bytes("abcd")));
+
+ cfs.truncateBlocking();
+
+ Util.assertEmpty(Util.cmd(cfs).columns("val").build());
+ }
+ finally
+ {
+ DatabaseDescriptor.setAutoSnapshot(originalState);
+ }
}
-
- private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException
- {
- ByteBuffer buf = ByteBuffer.allocate(1024);
- CommitLogDescriptor.writeHeader(buf, desc);
- // Put some extra data in the stream.
- buf.putDouble(0.1);
- buf.flip();
-
- DataInputBuffer input = new DataInputBuffer(buf, false);
- CommitLogDescriptor read = CommitLogDescriptor.readHeader(input);
- Assert.assertEquals("Descriptors", desc, read);
- }
-
- @Test
- public void testDescriptorPersistence() throws IOException
- {
- testDescriptorPersistence(new CommitLogDescriptor(11, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 15, null));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 17, new ParameterizedClass("LZ4Compressor", null)));
- testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_30, 19,
- new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null"))));
- }
-
- @Test
- public void testDescriptorInvalidParametersSize() throws IOException
- {
- Map<String, String> params = new HashMap<>();
- for (int i=0; i<65535; ++i)
- params.put("key"+i, Integer.toString(i, 16));
- try {
- CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_30,
- 21,
- new ParameterizedClass("LZ4Compressor", params));
- ByteBuffer buf = ByteBuffer.allocate(1024000);
- CommitLogDescriptor.writeHeader(buf, desc);
- Assert.fail("Parameter object too long should fail on writing descriptor.");
- } catch (ConfigurationException e)
- {
- // correct path
- }
- }
}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e826951/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTestMaker.java
----------------------------------------------------------------------