You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/05/26 02:57:20 UTC
[03/10] cassandra git commit: Prevent OOM failures on SSTable
corruption, improve tests for corruption detection
Prevent OOM failures on SSTable corruption, improve tests for corruption detection
Patch by Alex Petrov; reviewed by Stefania Alborghetti for CASSANDRA-9530
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/85cc3901
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/85cc3901
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/85cc3901
Branch: refs/heads/trunk
Commit: 85cc390189f45be54dec9b146f66eeb7737fb0eb
Parents: e000ebb
Author: Alex Petrov <ol...@gmail.com>
Authored: Tue May 24 09:46:43 2016 +0200
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Thu May 26 10:51:06 2016 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 11 +
conf/cassandra.yaml | 8 +-
.../org/apache/cassandra/config/Config.java | 6 +
.../cassandra/config/DatabaseDescriptor.java | 8 +
.../apache/cassandra/db/ClusteringPrefix.java | 4 +-
.../columniterator/AbstractSSTableIterator.java | 2 +-
.../cassandra/db/marshal/AbstractType.java | 46 +++-
src/java/org/apache/cassandra/db/rows/Rows.java | 2 +-
.../cassandra/db/rows/UnfilteredSerializer.java | 13 +-
.../io/sstable/SSTableIdentityIterator.java | 5 +
.../io/sstable/format/big/BigTableScanner.java | 4 +-
.../io/util/RebufferingInputStream.java | 2 +-
.../compaction/BlacklistingCompactionsTest.java | 45 +++-
.../sstable/SSTableCorruptionDetectionTest.java | 244 +++++++++++++++++++
.../cassandra/io/sstable/SSTableWriterTest.java | 49 +++-
.../io/sstable/SSTableWriterTestBase.java | 10 +-
17 files changed, 431 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a449fd..ddfb24f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.7
+ * Prevent OOM failures on SSTable corruption, improve tests for corruption detection (CASSANDRA-9530)
* Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
* Allow compaction strategies to disable early open (CASSANDRA-11754)
* Refactor Materialized View code (CASSANDRA-11475)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index d13f94f..ac1ef17 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,17 @@ restore snapshots created with the previous major version using the
'sstableloader' tool. You can upgrade the file format of your snapshots
using the provided 'sstableupgrade' tool.
+3.0.7
+=====
+
+Upgrading
+---------
+ - A maximum size for SSTables values has been introduced, to prevent out of memory
+ exceptions when reading corrupt SSTables. This maximum size can be set via
+ max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default
+ value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if
+ they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details.
+
3.0.6
=====
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a460abd..4b92f64 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -512,7 +512,8 @@ native_transport_port: 9042
# native_transport_max_threads: 128
#
# The maximum size of allowed frame. Frame (requests) larger than this will
-# be rejected as invalid. The default is 256MB.
+# be rejected as invalid. The default is 256MB. If you're changing this parameter,
+# you may want to adjust max_value_size_in_mb accordingly.
# native_transport_max_frame_size_in_mb: 256
# The maximum number of concurrent client connections.
@@ -944,3 +945,8 @@ enable_scripted_user_defined_functions: false
# below their system default. The sysinternals 'clockres' tool can confirm your system's default
# setting.
windows_timer_interval: 1
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted.
+# max_value_size_in_mb: 256
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 235641c..b49e14c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -152,6 +152,12 @@ public class Config
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
+ /**
+ * Max size of values in SSTables, in MegaBytes.
+ * Default is the same as the native protocol frame limit: 256Mb.
+ * See AbstractType for how it is used.
+ */
+ public Integer max_value_size_in_mb = 256;
public Integer thrift_framed_transport_size_in_mb = 15;
public Boolean snapshot_before_compaction = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 295b827..dcda76f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -712,6 +712,9 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false);
}
+
+ if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0)
+ throw new ConfigurationException("max_value_size_in_mb must be positive", false);
}
private static FileStore guessFileStore(String dir) throws IOException
@@ -815,6 +818,11 @@ public class DatabaseDescriptor
return conf.thrift_framed_transport_size_in_mb * 1024 * 1024;
}
+ public static int getMaxValueSize()
+ {
+ return conf.max_value_size_in_mb * 1024 * 1024;
+ }
+
/**
* Creates all storage-related directories.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 17befca..8d28637 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -418,7 +418,9 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
public void prepare(int flags, int extendedFlags) throws IOException
{
- assert !UnfilteredSerializer.isStatic(extendedFlags) : "Flags = " + flags;
+ if (UnfilteredSerializer.isStatic(extendedFlags))
+ throw new IOException("Corrupt flags value for clustering prefix (isStatic flag set): " + flags);
+
this.nextIsRow = UnfilteredSerializer.kind(flags) == Unfiltered.Kind.ROW;
this.nextKind = nextIsRow ? Kind.CLUSTERING : ClusteringPrefix.Kind.values()[in.readByte()];
this.nextSize = nextIsRow ? comparator.size() : in.readUnsignedShort();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 0e2012e..d57e6bc 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -339,7 +339,7 @@ abstract class AbstractSSTableIterator implements SliceableUnfilteredRowIterator
{
return hasNextInternal();
}
- catch (IOException e)
+ catch (IOException | IndexOutOfBoundsException e)
{
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/marshal/AbstractType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
index 331f1a4..9d1cc8a 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java
@@ -27,9 +27,11 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQL3Type;
import org.apache.cassandra.cql3.Term;
import org.apache.cassandra.db.TypeSizes;
@@ -41,11 +43,9 @@ import org.apache.cassandra.utils.FastByteOperations;
import org.github.jamm.Unmetered;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.db.marshal.AbstractType.ComparisonType.CUSTOM;
-import static org.apache.cassandra.db.marshal.AbstractType.ComparisonType.NOT_COMPARABLE;
/**
* Specifies a Comparator for a specific type of ByteBuffer.
@@ -62,7 +62,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
public final Comparator<ByteBuffer> reverseComparator;
- public static enum ComparisonType
+ public enum ComparisonType
{
/**
* This type should never be compared
@@ -82,10 +82,23 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
public final ComparisonType comparisonType;
public final boolean isByteOrderComparable;
+
+ /**
+ * The maximum size of values for this type, used when some values are not of fixed length,
+ * that is valueLengthIfFixed() returns -1.
+ */
+ public int maxValueSize;
+
protected AbstractType(ComparisonType comparisonType)
{
+ this(comparisonType, DatabaseDescriptor.getMaxValueSize());
+ }
+
+ protected AbstractType(ComparisonType comparisonType, int maxValueSize)
+ {
this.comparisonType = comparisonType;
this.isByteOrderComparable = comparisonType == ComparisonType.BYTE_ORDER;
+ this.maxValueSize = maxValueSize;
reverseComparator = (o1, o2) -> AbstractType.this.compare(o2, o1);
try
{
@@ -101,6 +114,17 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
}
}
+ /**
+ * Change the maximum value size, this should only be called for testing.
+ * Unfortunately, ensuring we use a type created with a different maxValueSize
+ * is too hard at the moment, due to the pervasive use of the type's singleton instances.
+ */
+ @VisibleForTesting
+ public void setMaxValueSize(int maxValueSize)
+ {
+ this.maxValueSize = maxValueSize;
+ }
+
public static List<String> asCQLTypeStringList(List<AbstractType<?>> abstractTypes)
{
List<String> r = new ArrayList<>(abstractTypes.size());
@@ -357,7 +381,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
}
/**
- * The length of values for this type if all values are of fixed length, -1 otherwise.
+ * The length of values for this type if all values are of fixed length, -1 otherwise.
*/
protected int valueLengthIfFixed()
{
@@ -385,10 +409,22 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer>
public ByteBuffer readValue(DataInputPlus in) throws IOException
{
int length = valueLengthIfFixed();
+
if (length >= 0)
return ByteBufferUtil.read(in, length);
else
- return ByteBufferUtil.readWithVIntLength(in);
+ {
+ int l = (int)in.readUnsignedVInt();
+ if (l < 0)
+ throw new IOException("Corrupt (negative) value length encountered");
+
+ if (l > maxValueSize)
+ throw new IOException(String.format("Corrupt value length %d encountered, as it exceeds the maximum of %d, " +
+ "which is set via max_value_size_in_mb in cassandra.yaml",
+ l, maxValueSize));
+
+ return ByteBufferUtil.read(in, l);
+ }
}
public void skipValue(DataInputPlus in) throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/rows/Rows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java
index ea2ca06..e325091 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -59,7 +59,7 @@ public abstract class Rows
}
/**
- * Collect statistics ont a given row.
+ * Collect statistics on a given row.
*
* @param row the row for which to collect stats.
* @param collector the stats collector.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 4efc5eb..e4202c9 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -355,9 +355,18 @@ public class UnfilteredSerializer
}
else
{
- assert !isStatic(extendedFlags); // deserializeStaticRow should be used for that.
+ // deserializeStaticRow should be used for that.
+ if (isStatic(extendedFlags))
+ throw new IOException("Corrupt flags value for unfiltered partition (isStatic flag set): " + flags);
+
builder.newRow(Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes()));
- return deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
+ Row row = deserializeRowBody(in, header, helper, flags, extendedFlags, builder);
+ // we do not write empty rows because Rows.collectStats(), called by BTW.applyToRow(), asserts that rows are not empty
+ // if we don't throw here, then later the very same assertion in Rows.collectStats() will fail compactions
+ // see BlackListingCompactionsTest and CASSANDRA-9530 for details
+ if (row.isEmpty())
+ throw new IOException("Corrupt empty row found in unfiltered partition");
+ return row;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index acdf6bb..6fbc690 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -99,6 +99,11 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem
{
return iterator.hasNext() ? iterator.next() : endOfData();
}
+ catch (IndexOutOfBoundsException e)
+ {
+ sstable.markSuspect();
+ throw new CorruptSSTableException(e, filename);
+ }
catch (IOError e)
{
if (e.getCause() instanceof IOException)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index 717cfdc..a3bd442 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -329,8 +329,8 @@ public class BigTableScanner implements ISSTableScanner
{
if (dataRange == null)
{
- dfile.seek(currentEntry.position + currentEntry.headerOffset());
- ByteBufferUtil.readWithShortLength(dfile); // key
+ dfile.seek(currentEntry.position);
+ ByteBufferUtil.skipShortLength(dfile); // key
return new SSTableIdentityIterator(sstable, dfile, partitionKey());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
index 3068746..15d0975 100644
--- a/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/RebufferingInputStream.java
@@ -65,7 +65,7 @@ public abstract class RebufferingInputStream extends InputStream implements Data
{
int read = read(b, off, len);
if (read < len)
- throw new EOFException();
+ throw new EOFException("EOF after " + read + " bytes out of " + len);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
index 19d2347..21ce450 100644
--- a/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
@@ -28,25 +28,32 @@ import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class BlacklistingCompactionsTest
{
+ private static final Logger logger = LoggerFactory.getLogger(BlacklistingCompactionsTest.class);
+
+ private static Random random;
+
private static final String KEYSPACE1 = "BlacklistingCompactionsTest";
- private static final String CF_STANDARD1 = "Standard1";
+ private static final String STANDARD_STCS = "Standard_STCS";
+ private static final String STANDARD_LCS = "Standard_LCS";
@After
public void leakDetect() throws InterruptedException
@@ -60,10 +67,26 @@ public class BlacklistingCompactionsTest
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
+ long seed = System.nanoTime();
+ //long seed = 754271160974509L; // CASSANDRA-9530: use this seed to reproduce compaction failures if reading empty rows
+ logger.info("Seed {}", seed);
+ random = new Random(seed);
+
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1));
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD_STCS).compaction(CompactionParams.DEFAULT),
+ SchemaLoader.standardCFMD(KEYSPACE1, STANDARD_LCS).compaction(CompactionParams.lcs(Collections.emptyMap())));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE1);
+ for (String tableName : new String[] {STANDARD_STCS, STANDARD_LCS})
+ {
+ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(tableName);
+
+ for (ColumnDefinition cd : cfs.metadata.allColumns())
+ cd.type.setMaxValueSize(1024 * 1024); // set max value size to 1MB
+ }
+
closeStdErr();
}
@@ -80,20 +103,20 @@ public class BlacklistingCompactionsTest
@Test
public void testBlacklistingWithSizeTieredCompactionStrategy() throws Exception
{
- testBlacklisting(SizeTieredCompactionStrategy.class.getCanonicalName());
+ testBlacklisting(STANDARD_STCS);
}
@Test
public void testBlacklistingWithLeveledCompactionStrategy() throws Exception
{
- testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName());
+ testBlacklisting(STANDARD_LCS);
}
- public void testBlacklisting(String compactionStrategy) throws Exception
+ private void testBlacklisting(String tableName) throws Exception
{
// this test does enough rows to force multiple block indexes to be used
Keyspace keyspace = Keyspace.open(KEYSPACE1);
- final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
+ final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(tableName);
final int ROWS_PER_SSTABLE = 10;
final int SSTABLES = cfs.metadata.params.minIndexInterval * 2 / ROWS_PER_SSTABLE;
@@ -142,11 +165,11 @@ public class BlacklistingCompactionsTest
raf = new RandomAccessFile(sstable.getFilename(), "rw");
assertNotNull(raf);
assertTrue(raf.length() > corruptionSize);
- raf.seek(new Random().nextInt((int)(raf.length() - corruptionSize)));
+ raf.seek(random.nextInt((int)(raf.length() - corruptionSize)));
// We want to write something large enough that the corruption cannot get undetected
// (even without compression)
byte[] corruption = new byte[corruptionSize];
- Arrays.fill(corruption, (byte)0xFF);
+ random.nextBytes(corruption);
raf.write(corruption);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
new file mode 100644
index 0000000..0c27fc8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.*;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.*;
+import org.apache.cassandra.cache.*;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.schema.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
+{
+ private static final Logger logger = LoggerFactory.getLogger(SSTableCorruptionDetectionTest.class);
+
+ private static final int numberOfPks = 1000;
+ private static final int numberOfRuns = 100;
+ private static final int valueSize = 512 * 1024;
+ // Set corruption size larger or in comparable size to value size, otherwise
+ // chance for corruption to land in the middle of value is quite high.
+ private static final int maxCorruptionSize = 2 * 1024 * 1024;
+
+ private static final String keyspace = "SSTableCorruptionDetectionTest";
+ private static final String table = "corrupted_table";
+
+ private static Random random;
+ private static SSTableWriter writer;
+ private static LifecycleTransaction txn;
+ private static ColumnFamilyStore cfs;
+ private static SSTableReader ssTableReader;
+
+ @BeforeClass
+ public static void setUp()
+ {
+ CFMetaData cfm = CFMetaData.Builder.create(keyspace, table)
+ .addPartitionKey("pk", AsciiType.instance)
+ .addClusteringColumn("ck1", AsciiType.instance)
+ .addClusteringColumn("ck2", AsciiType.instance)
+ .addRegularColumn("reg1", BytesType.instance)
+ .addRegularColumn("reg2", BytesType.instance)
+ .build();
+
+ cfm.compression(CompressionParams.noCompression());
+ SchemaLoader.createKeyspace(keyspace,
+ KeyspaceParams.simple(1),
+ cfm);
+
+ cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);
+ cfs.disableAutoCompaction();
+
+ for (ColumnDefinition cd : cfs.metadata.allColumns())
+ cd.type.setMaxValueSize(1024 * 1024);
+
+ long seed = System.nanoTime();
+ logger.info("Seed {}", seed);
+ random = new Random(seed);
+
+ truncate(cfs);
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ txn = LifecycleTransaction.offline(OperationType.WRITE);
+
+ // Setting up/writing large values is an expensive operation, we only want to do it once per run
+ writer = getWriter(cfs, dir, txn);
+ for (int i = 0; i < numberOfPks; i++)
+ {
+ UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, String.format("pkvalue_%07d", i)).withTimestamp(1);
+ byte[] reg1 = new byte[valueSize];
+ random.nextBytes(reg1);
+ byte[] reg2 = new byte[valueSize];
+ random.nextBytes(reg2);
+ builder.newRow("clustering_" + i, "clustering_" + (i + 1))
+ .add("reg1", ByteBuffer.wrap(reg1))
+ .add("reg2", ByteBuffer.wrap(reg2));
+ writer.append(builder.build().unfilteredIterator());
+ }
+ cfs.forceBlockingFlush();
+
+ ssTableReader = writer.finish(true);
+ txn.update(ssTableReader, false);
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ txn.abort();
+ writer.close();
+ }
+
+ @Test
+ public void testSinglePartitionIterator() throws Throwable
+ {
+ bruteForceCorruptionTest(ssTableReader, partitionIterator());
+ }
+
+ @Test
+ public void testSSTableScanner() throws Throwable
+ {
+ bruteForceCorruptionTest(ssTableReader, sstableScanner());
+ }
+
+ private void bruteForceCorruptionTest(SSTableReader ssTableReader, Consumer<SSTableReader> walker) throws Throwable
+ {
+ RandomAccessFile raf = new RandomAccessFile(ssTableReader.getFilename(), "rw");
+
+ int corruptedCounter = 0;
+
+ int fileLength = (int)raf.length(); // in current test, it does fit into int
+ for (int i = 0; i < numberOfRuns; i++)
+ {
+ final int corruptionPosition = random.nextInt(fileLength - 1); //ensure at least one byte will be corrupted
+ // corrupt max from position to end of file
+ final int corruptionSize = Math.min(maxCorruptionSize, random.nextInt(fileLength - corruptionPosition));
+
+ byte[] backup = corruptSstable(raf, corruptionPosition, corruptionSize);
+
+ try
+ {
+ walker.accept(ssTableReader);
+ }
+ catch (CorruptSSTableException t)
+ {
+ corruptedCounter++;
+ }
+ finally
+ {
+ restore(raf, corruptionPosition, backup);
+ }
+ }
+
+ assertTrue(corruptedCounter > 0);
+ FileUtils.closeQuietly(raf);
+ }
+
+ private Consumer<SSTableReader> sstableScanner()
+ {
+ return (SSTableReader sstable) -> {
+ try (ISSTableScanner scanner = sstable.getScanner())
+ {
+ while (scanner.hasNext())
+ {
+ try (UnfilteredRowIterator rowIter = scanner.next())
+ {
+ if (rowIter.hasNext())
+ {
+ Unfiltered unfiltered = rowIter.next();
+ if (unfiltered.isRow())
+ {
+ Row row = (Row) unfiltered;
+ assertEquals(2, row.clustering().size());
+ // no-op read
+ }
+ }
+ }
+
+ }
+ }
+ };
+ }
+
+ private Consumer<SSTableReader> partitionIterator()
+ {
+ return (SSTableReader sstable) -> {
+ for (int i = 0; i < numberOfPks; i++)
+ {
+ DecoratedKey dk = Util.dk(String.format("pkvalue_%07d", i));
+ try (UnfilteredRowIterator rowIter = sstable.iterator(dk, ColumnFilter.all(cfs.metadata), false, false))
+ {
+ while (rowIter.hasNext())
+ {
+ Unfiltered unfiltered = rowIter.next();
+ if (unfiltered.isRow())
+ {
+ Row row = (Row) unfiltered;
+ assertEquals(2, row.clustering().size());
+ // no-op read
+ }
+ }
+ rowIter.close();
+ }
+ }
+ };
+ }
+
+ private byte[] corruptSstable(RandomAccessFile raf, int position, int corruptionSize) throws IOException
+ {
+ byte[] backup = new byte[corruptionSize];
+ raf.seek(position);
+ raf.read(backup);
+
+ raf.seek(position);
+ byte[] corruption = new byte[corruptionSize];
+ random.nextBytes(corruption);
+ raf.write(corruption);
+
+ return backup;
+ }
+
+ private void restore(RandomAccessFile raf, int position, byte[] backup) throws IOException
+ {
+ raf.seek(position);
+ raf.write(backup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
index a73a164..6f18461 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
@@ -23,15 +23,17 @@ import java.nio.ByteBuffer;
import org.junit.Test;
-import org.apache.cassandra.UpdateBuilder;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.*;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
+import static junit.framework.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -197,4 +199,45 @@ public class SSTableWriterTest extends SSTableWriterTestBase
writer2.close();
}
}
+
+ @Test
+ public void testValueTooBigCorruption() throws InterruptedException
+ {
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+ truncate(cfs);
+
+ File dir = cfs.getDirectories().getDirectoryForNewSSTables();
+ LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM);
+
+ try (SSTableWriter writer1 = getWriter(cfs, dir, txn))
+ {
+ UpdateBuilder largeValue = UpdateBuilder.create(cfs.metadata, "large_value").withTimestamp(1);
+ largeValue.newRow("clustering").add("val", ByteBuffer.allocate(2 * 1024 * 1024));
+ writer1.append(largeValue.build().unfilteredIterator());
+
+ SSTableReader sstable = writer1.finish(true);
+
+ txn.update(sstable, false);
+
+ try
+ {
+ DecoratedKey dk = Util.dk("large_value");
+ UnfilteredRowIterator rowIter = sstable.iterator(dk, ColumnFilter.all(cfs.metadata), false, false);
+ while (rowIter.hasNext())
+ {
+ rowIter.next();
+ // no-op read, as values may not appear expected
+ }
+ fail("Expected a CorruptSSTableException to be thrown");
+ }
+ catch (CorruptSSTableException e)
+ {
+ }
+
+ txn.abort();
+ LifecycleTransaction.waitForDeletions();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/85cc3901/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
index 2db92f7..f2c97c0 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java
@@ -31,6 +31,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -54,6 +55,7 @@ public class SSTableWriterTestBase extends SchemaLoader
protected static final String KEYSPACE = "SSTableRewriterTest";
protected static final String CF = "Standard1";
+ protected static final String CF_SMALL_MAX_VALUE = "Standard_SmallMaxValue";
private static Config.DiskAccessMode standardMode;
private static Config.DiskAccessMode indexMode;
@@ -73,7 +75,13 @@ public class SSTableWriterTestBase extends SchemaLoader
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
- SchemaLoader.standardCFMD(KEYSPACE, CF));
+ SchemaLoader.standardCFMD(KEYSPACE, CF),
+ SchemaLoader.standardCFMD(KEYSPACE, CF_SMALL_MAX_VALUE));
+
+ Keyspace keyspace = Keyspace.open(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE);
+ for (ColumnDefinition cd : cfs.metadata.allColumns())
+ cd.type.setMaxValueSize(1024 * 1024); // set max value size to 1MB
}
@AfterClass