You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2018/06/05 20:04:06 UTC
cassandra git commit: Add option to sanity check tombstones on
reads/compaction
Repository: cassandra
Updated Branches:
refs/heads/trunk 4413fdbd3 -> 5d8767765
Add option to sanity check tombstones on reads/compaction
Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-14467
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d876776
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d876776
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d876776
Branch: refs/heads/trunk
Commit: 5d8767765090cd968c39008f76b0cd795d6e5032
Parents: 4413fdb
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue May 22 13:43:22 2018 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Tue Jun 5 12:47:20 2018 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 3 +
.../org/apache/cassandra/config/Config.java | 9 +-
.../cassandra/config/DatabaseDescriptor.java | 10 +
.../org/apache/cassandra/db/DeletionTime.java | 9 +
.../cassandra/db/UnfilteredValidation.java | 113 ++++++++++
.../columniterator/AbstractSSTableIterator.java | 2 +
.../db/columniterator/SSTableIterator.java | 2 +
.../columniterator/SSTableReversedIterator.java | 1 +
.../apache/cassandra/db/rows/AbstractCell.java | 7 +
.../apache/cassandra/db/rows/AbstractRow.java | 12 +
.../apache/cassandra/db/rows/ColumnData.java | 7 +
.../cassandra/db/rows/ComplexColumnData.java | 10 +
.../db/rows/RangeTombstoneBoundMarker.java | 5 +
.../db/rows/RangeTombstoneBoundaryMarker.java | 5 +
.../apache/cassandra/db/rows/Unfiltered.java | 6 +
.../io/sstable/SSTableIdentityIterator.java | 6 +-
.../io/sstable/format/SSTableReader.java | 6 +
.../cassandra/service/StorageService.java | 12 +
.../cassandra/service/StorageServiceMBean.java | 2 +
test/conf/cassandra.yaml | 1 +
.../config/DatabaseDescriptorRefTest.java | 1 +
.../cql3/validation/operations/TTLTest.java | 19 ++
.../db/compaction/CompactionsCQLTest.java | 223 +++++++++++++++++++
.../sstable/SSTableCorruptionDetectionTest.java | 5 +
25 files changed, 475 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 351ae37..eb064be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0
+ * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
* Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
* Let nodetool import take a list of directories (CASSANDRA-14442)
* Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 49c6f03..7ff056d 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1198,3 +1198,6 @@ audit_logging_options:
# included_users:
# excluded_users:
+# validate tombstones on reads and compaction
+# can be either "disabled", "warn" or "exception"
+# corrupted_tombstone_strategy: disabled
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index d945368..d9250bb 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -383,7 +383,7 @@ public class Config
public volatile AuditLogOptions audit_logging_options = new AuditLogOptions();
-
+ public CorruptedTombstoneStrategy corrupted_tombstone_strategy = CorruptedTombstoneStrategy.disabled;
/**
* @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
*/
@@ -468,6 +468,13 @@ public class Config
reject
}
+ public enum CorruptedTombstoneStrategy
+ {
+ disabled,
+ warn,
+ exception
+ }
+
private static final List<String> SENSITIVE_KEYS = new ArrayList<String>() {{
add("client_encryption_options");
add("server_encryption_options");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 592b96e..91ee63a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2596,4 +2596,14 @@ public class DatabaseDescriptor
{
conf.audit_logging_options = auditLoggingOptions;
}
+
+ public static Config.CorruptedTombstoneStrategy getCorruptedTombstoneStrategy()
+ {
+ return conf.corrupted_tombstone_strategy;
+ }
+
+ public static void setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy strategy)
+ {
+ conf.corrupted_tombstone_strategy = strategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index 9dcbb07..14e846d 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -88,6 +88,15 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
HashingUtils.updateWithLong(hasher, markedForDeleteAt());
}
+ /**
+ * check if this deletion time is valid - localDeletionTime can never be negative
+ * @return true if it is valid
+ */
+ public boolean validate()
+ {
+ return localDeletionTime >= 0;
+ }
+
@Override
public boolean equals(Object o)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/UnfilteredValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredValidation.java b/src/java/org/apache/cassandra/db/UnfilteredValidation.java
new file mode 100644
index 0000000..6d8bbfd
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/UnfilteredValidation.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.Unfiltered;
+
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+
+/**
+ * Handles unfiltered validation - if configured, it checks if the provided unfiltered has
+ * invalid deletions (if the local deletion time is negative or if the ttl is negative) and
+ * then either logs or throws an exception if so.
+ */
+public class UnfilteredValidation
+{
+ private static final Logger logger = LoggerFactory.getLogger(UnfilteredValidation.class);
+ private static final NoSpamLogger nospam1m = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+ public static void maybeValidateUnfiltered(Unfiltered unfiltered, TableMetadata metadata, DecoratedKey key, SSTableReader sstable)
+ {
+ Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+ if (strat != Config.CorruptedTombstoneStrategy.disabled && unfiltered != null && !unfiltered.isEmpty())
+ {
+ boolean hasInvalidDeletions = false;
+ try
+ {
+ hasInvalidDeletions = unfiltered.hasInvalidDeletions();
+ }
+ catch (Throwable t) // make sure no unknown exceptions fail the read/compaction
+ {
+ nospam1m.error("Could not check if Unfiltered in {} had any invalid deletions", sstable, t);
+ }
+
+ if (hasInvalidDeletions)
+ {
+ String content;
+ try
+ {
+ content = unfiltered.toString(metadata, true);
+ }
+ catch (Throwable t)
+ {
+ content = "Could not get string representation: " + t.getMessage();
+ }
+ handleInvalid(metadata, key, sstable, content);
+ }
+ }
+ }
+
+ public static void handleInvalid(TableMetadata metadata, DecoratedKey key, SSTableReader sstable, String invalidContent)
+ {
+ Config.CorruptedTombstoneStrategy strat = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+ String keyString;
+ try
+ {
+ keyString = metadata.partitionKeyType.getString(key.getKey());
+ }
+ catch (Throwable t)
+ {
+ keyString = "[corrupt token="+key.getToken()+"]";
+ }
+
+ if (strat == Config.CorruptedTombstoneStrategy.exception)
+ {
+ String msg = String.format("Key %s in %s.%s is invalid in %s: %s",
+ keyString,
+ metadata.keyspace,
+ metadata.name,
+ sstable,
+ invalidContent);
+ // we mark suspect to make sure this sstable is not included in future compactions - it would just keep
+ // throwing exceptions
+ sstable.markSuspect();
+ throw new CorruptSSTableException(new MarshalException(msg), sstable.getFilename());
+ }
+ else if (strat == Config.CorruptedTombstoneStrategy.warn)
+ {
+ String msgTemplate = String.format("Key {} in %s.%s is invalid in %s: {}",
+ metadata.keyspace,
+ metadata.name,
+ sstable);
+ nospam1m.warn(msgTemplate, keyString, invalidContent);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 9496878..443fe49 100644
--- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -111,6 +111,8 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator
this.staticRow = Rows.EMPTY_STATIC_ROW;
this.reader = createReader(indexEntry, file, shouldCloseFile);
}
+ if (!partitionLevelDeletion.validate())
+ UnfilteredValidation.handleInvalid(metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
if (reader != null && !slices.isEmpty())
reader.setForSlice(nextSlice());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index 6b8a3ad..9346345 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -150,6 +150,7 @@ public class SSTableIterator extends AbstractSSTableIterator
return null;
Unfiltered next = deserializer.readNext();
+ UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable);
// We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
if (next.isEmpty())
continue;
@@ -299,6 +300,7 @@ public class SSTableIterator extends AbstractSSTableIterator
Unfiltered next = deserializer.readNext();
+ UnfilteredValidation.maybeValidateUnfiltered(next, metadata(), key, sstable);
// We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
if (next.isEmpty())
continue;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 9a30d19..9f449a0 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -223,6 +223,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
&& !stopReadingDisk())
{
Unfiltered unfiltered = deserializer.readNext();
+ UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable);
// We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne.
if (!unfiltered.isEmpty())
buffer.add(unfiltered);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 4946a46..bfe7396 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -154,6 +154,13 @@ public abstract class AbstractCell extends Cell
column().validateCell(this);
}
+ public boolean hasInvalidDeletions()
+ {
+ if (ttl() < 0 || localDeletionTime() < 0 || (isExpiring() && localDeletionTime() == NO_DELETION_TIME))
+ return true;
+ return false;
+ }
+
public long maxTimestamp()
{
return timestamp();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 211b13f..24b088f 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -91,6 +91,18 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
cd.validate();
}
+ public boolean hasInvalidDeletions()
+ {
+ if (primaryKeyLivenessInfo().isExpiring() && (primaryKeyLivenessInfo().ttl() < 0 || primaryKeyLivenessInfo().localExpirationTime() < 0))
+ return true;
+ if (!deletion().time().validate())
+ return true;
+ for (ColumnData cd : this)
+ if (cd.hasInvalidDeletions())
+ return true;
+ return false;
+ }
+
public String toString(TableMetadata metadata)
{
return toString(metadata, false);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index ccfcfa5..f2da132 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -66,6 +66,13 @@ public abstract class ColumnData
public abstract void validate();
/**
+ * Validates the deletions (ttl and local deletion time) if any.
+ *
+ * @return true if it has any invalid deletions, false otherwise
+ */
+ public abstract boolean hasInvalidDeletions();
+
+ /**
* Adds the data to the provided digest.
*
* @param hasher the {@link Hasher} to add the data to.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 57851d8..3073e5f 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -138,6 +138,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell>
cell.digest(hasher);
}
+ public boolean hasInvalidDeletions()
+ {
+ if (!complexDeletion.validate())
+ return true;
+ for (Cell cell : this)
+ if (cell.hasInvalidDeletions())
+ return true;
+ return false;
+ }
+
public ComplexColumnData markCounterLocalToBeCleared()
{
return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index c0c6afd..094cf72 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -68,6 +68,11 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
return false;
}
+ public boolean hasInvalidDeletions()
+ {
+ return !deletionTime().validate();
+ }
+
/**
* The deletion time for the range tombstone this is a bound of.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 6e6c8cd..79d5b1a 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -116,6 +116,11 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
return true;
}
+ public boolean hasInvalidDeletions()
+ {
+ return !startDeletion.validate() || !endDeletion.validate();
+ }
+
public RangeTombstoneBoundaryMarker copy(AbstractAllocator allocator)
{
return new RangeTombstoneBoundaryMarker(clustering().copy(allocator), endDeletion, startDeletion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index 3a65f4e..81b63b7 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -55,6 +55,12 @@ public interface Unfiltered extends Clusterable
*/
public void validateData(TableMetadata metadata);
+ /**
+ * Do a quick validation of the deletions of the unfiltered (if any)
+ *
+ * @return true if any deletion is invalid
+ */
+ public boolean hasInvalidDeletions();
public boolean isEmpty();
public String toString(TableMetadata metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index f9c6e82..a49e7b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -55,6 +55,8 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
try
{
DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+ if (!partitionLevelDeletion.validate())
+ UnfilteredValidation.handleInvalid(sstable.metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
SerializationHelper helper = new SerializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, partitionLevelDeletion);
return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator);
@@ -169,7 +171,9 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat
protected Unfiltered doCompute()
{
- return iterator.next();
+ Unfiltered unfiltered = iterator.next();
+ UnfilteredValidation.maybeValidateUnfiltered(unfiltered, metadata(), key, sstable);
+ return unfiltered;
}
public void close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index ee7e445..2fade21 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -1729,6 +1729,12 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
isSuspect.getAndSet(true);
}
+ @VisibleForTesting
+ public void unmarkSuspect()
+ {
+ isSuspect.getAndSet(false);
+ }
+
public boolean isMarkedSuspect()
{
return isSuspect.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4214644..96fd63f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -63,6 +63,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -5471,4 +5472,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
return AuditLogManager.getInstance().isAuditingEnabled();
}
+
+ public String getCorruptedTombstoneStrategy()
+ {
+ return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString();
+ }
+
+ public void setCorruptedTombstoneStrategy(String strategy)
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy));
+ logger.info("Setting corrupted tombstone strategy to {}", strategy);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index ab165b3..e54a95e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -683,4 +683,6 @@ public interface StorageServiceMBean extends NotificationEmitter
public void disableAuditLog();
public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, String includedUsers, String excludedUsers) throws ConfigurationException;
public boolean isAuditLogEnabled();
+ public String getCorruptedTombstoneStrategy();
+ public void setCorruptedTombstoneStrategy(String strategy);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 640f9b3..5893bab 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -45,3 +45,4 @@ row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true
prepared_statements_cache_size_mb: 1
+corrupted_tombstone_strategy: exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 4078e2a..68435a8 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -73,6 +73,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.Config$MemtableAllocationType",
"org.apache.cassandra.config.Config$RepairCommandPoolFullStrategy",
"org.apache.cassandra.config.Config$UserFunctionTimeoutPolicy",
+ "org.apache.cassandra.config.Config$CorruptedTombstoneStrategy",
"org.apache.cassandra.config.ParameterizedClass",
"org.apache.cassandra.config.EncryptionOptions",
"org.apache.cassandra.config.EncryptionOptions$ClientEncryptionOptions",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
index fc70974..99ca7dc 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@ -9,6 +9,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
@@ -19,6 +21,8 @@ import org.apache.cassandra.db.rows.AbstractCell;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class TTLTest extends CQLTester
@@ -31,6 +35,18 @@ public class TTLTest extends CQLTester
public static final String SIMPLE_CLUSTERING = "table2";
public static final String COMPLEX_NOCLUSTERING = "table3";
public static final String COMPLEX_CLUSTERING = "table4";
+ private Config.CorruptedTombstoneStrategy corruptTombstoneStrategy;
+ @Before
+ public void before()
+ {
+ corruptTombstoneStrategy = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+ }
+
+ @After
+ public void after()
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(corruptTombstoneStrategy);
+ }
@Test
public void testTTLPerRequestLimit() throws Throwable
@@ -167,9 +183,12 @@ public class TTLTest extends CQLTester
@Test
public void testRecoverOverflowedExpirationWithScrub() throws Throwable
{
+ // this tests writes corrupt tombstones on purpose, disable the strategy:
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled);
baseTestRecoverOverflowedExpiration(false, false);
baseTestRecoverOverflowedExpiration(true, false);
baseTestRecoverOverflowedExpiration(true, true);
+ // we reset the corrupted ts strategy after each test in @After above
}
public void testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy policy) throws Throwable
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index 4d5215e..ca420da 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,23 +17,38 @@
*/
package org.apache.cassandra.db.compaction;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.serializers.MarshalException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -42,6 +57,21 @@ public class CompactionsCQLTest extends CQLTester
public static final int SLEEP_TIME = 5000;
+ private Config.CorruptedTombstoneStrategy strategy;
+
+ @Before
+ public void before()
+ {
+ strategy = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+ }
+
+ @After
+ public void after()
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(DatabaseDescriptor.getCorruptedTombstoneStrategy());
+ }
+
+
@Test
public void testTriggerMinorCompactionSTCS() throws Throwable
{
@@ -245,6 +275,189 @@ public class CompactionsCQLTest extends CQLTester
testPerCFSNeverPurgeTombstonesHelper(false);
}
+ @Test
+ public void testCompactionInvalidRTs() throws Throwable
+ {
+ // set the corruptedTombstoneStrategy to exception since these tests require it - if someone changed the default
+ // in test/conf/cassandra.yaml they would start failing
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ prepare();
+ // write a range tombstone with negative local deletion time (LDTs are not set by user and should not be negative):
+ RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1));
+ RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt);
+ rub.build().apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ compactAndValidate();
+ readAndValidate(true);
+ readAndValidate(false);
+ }
+
+ @Test
+ public void testCompactionInvalidTombstone() throws Throwable
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ prepare();
+ // write a standard tombstone with negative local deletion time (LDTs are not set by user and should not be negative):
+ RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b");
+ rub.build().apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ compactAndValidate();
+ readAndValidate(true);
+ readAndValidate(false);
+ }
+
+ @Test
+ public void testCompactionInvalidPartitionDeletion() throws Throwable
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ prepare();
+ // write a partition deletion with negative local deletion time (LDTs are not set by user and should not be negative)::
+ PartitionUpdate pu = PartitionUpdate.simpleBuilder(getCurrentColumnFamilyStore().metadata(), 22).nowInSec(-1).delete().build();
+ new Mutation(pu).apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ compactAndValidate();
+ readAndValidate(true);
+ readAndValidate(false);
+ }
+
+ @Test
+ public void testCompactionInvalidRowDeletion() throws Throwable
+ {
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ prepare();
+ // write a row deletion with negative local deletion time (LDTs are not set by user and should not be negative):
+ RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ compactAndValidate();
+ readAndValidate(true);
+ readAndValidate(false);
+ }
+
+ private void prepare() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
+ for (int i = 0; i < 2; i++)
+ execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", i, i, String.valueOf(i));
+ }
+
+ @Test
+ public void testIndexedReaderRowDeletion() throws Throwable
+ {
+ // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt row deletion
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+ DatabaseDescriptor.setColumnIndexSize(1024);
+ prepareWide();
+ RowUpdateBuilder.deleteRowAt(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, -1, 22, 33).apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ readAndValidate(true);
+ readAndValidate(false);
+ DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+ }
+
+ @Test
+ public void testIndexedReaderTombstone() throws Throwable
+ {
+ // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt standard tombstone
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+ DatabaseDescriptor.setColumnIndexSize(1024);
+ prepareWide();
+ RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), -1, System.currentTimeMillis() * 1000, 22).clustering(33).delete("b");
+ rub.build().apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ readAndValidate(true);
+ readAndValidate(false);
+ DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+ }
+
+ @Test
+ public void testIndexedReaderRT() throws Throwable
+ {
+ // write enough data to make sure we use an IndexedReader when doing a read, and make sure it fails when reading a corrupt range tombstone
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.exception);
+ int maxSizePre = DatabaseDescriptor.getColumnIndexSize();
+ DatabaseDescriptor.setColumnIndexSize(1024);
+ prepareWide();
+ RangeTombstone rt = new RangeTombstone(Slice.ALL, new DeletionTime(System.currentTimeMillis(), -1));
+ RowUpdateBuilder rub = new RowUpdateBuilder(getCurrentColumnFamilyStore().metadata(), System.currentTimeMillis() * 1000, 22).clustering(33).addRangeTombstone(rt);
+ rub.build().apply();
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ readAndValidate(true);
+ readAndValidate(false);
+ DatabaseDescriptor.setColumnIndexSize(maxSizePre);
+ }
+
+ private void prepareWide() throws Throwable
+ {
+ createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
+ for (int i = 0; i < 100; i++)
+ execute("INSERT INTO %s (id, id2, b) VALUES (?, ?, ?)", 22, i, StringUtils.repeat("ABCDEFG", 10));
+ }
+
+ private void compactAndValidate()
+ {
+ boolean gotException = false;
+ try
+ {
+ getCurrentColumnFamilyStore().forceMajorCompaction();
+ }
+ catch(Throwable t)
+ {
+ gotException = true;
+ Throwable cause = t;
+ while (cause != null && !(cause instanceof MarshalException))
+ cause = cause.getCause();
+ assertNotNull(cause);
+ MarshalException me = (MarshalException) cause;
+ assertTrue(me.getMessage().contains(getCurrentColumnFamilyStore().metadata.keyspace+"."+getCurrentColumnFamilyStore().metadata.name));
+ assertTrue(me.getMessage().contains("Key 22"));
+ }
+ assertTrue(gotException);
+ assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+ }
+
+ private void readAndValidate(boolean asc) throws Throwable
+ {
+ execute("select * from %s where id = 0 order by id2 "+(asc ? "ASC" : "DESC"));
+
+ boolean gotException = false;
+ try
+ {
+ for (UntypedResultSet.Row r : execute("select * from %s")) {}
+ }
+ catch (Throwable t)
+ {
+ assertTrue(t instanceof CorruptSSTableException);
+ gotException = true;
+ Throwable cause = t;
+ while (cause != null && !(cause instanceof MarshalException))
+ cause = cause.getCause();
+ assertNotNull(cause);
+ MarshalException me = (MarshalException) cause;
+ assertTrue(me.getMessage().contains("Key 22"));
+ }
+ assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+ assertTrue(gotException);
+ gotException = false;
+ try
+ {
+ execute("select * from %s where id = 22 order by id2 "+(asc ? "ASC" : "DESC"));
+ }
+ catch (Throwable t)
+ {
+ assertTrue(t instanceof CorruptSSTableException);
+ gotException = true;
+ Throwable cause = t;
+ while (cause != null && !(cause instanceof MarshalException))
+ cause = cause.getCause();
+ assertNotNull(cause);
+ MarshalException me = (MarshalException) cause;
+ assertTrue(me.getMessage().contains("Key 22"));
+ }
+ assertTrue(gotException);
+ assertSuspectAndReset(getCurrentColumnFamilyStore().getLiveSSTables());
+ }
public void testPerCFSNeverPurgeTombstonesHelper(boolean deletedCell) throws Throwable
{
@@ -281,6 +494,16 @@ public class CompactionsCQLTest extends CQLTester
getCurrentColumnFamilyStore().truncateBlocking();
}
+ private void assertSuspectAndReset(Collection<SSTableReader> sstables)
+ {
+ assertFalse(sstables.isEmpty());
+ for (SSTableReader sstable : sstables)
+ {
+ assertTrue(sstable.isMarkedSuspect());
+ sstable.unmarkSuspect();
+ }
+ }
+
private void assertTombstones(SSTableReader sstable, boolean expectTS)
{
boolean foundTombstone = false;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d876776/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
index 581109c..2510c5e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableCorruptionDetectionTest.java
@@ -70,10 +70,14 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
private static LifecycleTransaction txn;
private static ColumnFamilyStore cfs;
private static SSTableReader ssTableReader;
+ private static Config.CorruptedTombstoneStrategy original;
@BeforeClass
public static void setUp()
{
+ // this test writes corrupted data on purpose, disable corrupted tombstone detection
+ original = DatabaseDescriptor.getCorruptedTombstoneStrategy();
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.disabled);
TableMetadata.Builder cfm =
TableMetadata.builder(keyspace, table)
.addPartitionKeyColumn("pk", AsciiType.instance)
@@ -127,6 +131,7 @@ public class SSTableCorruptionDetectionTest extends SSTableWriterTestBase
txn.abort();
writer.close();
+ DatabaseDescriptor.setCorruptedTombstoneStrategy(original);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org